You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 00:59:50 UTC
[32/51] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index 9cf2f13,0000000..c731721
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@@ -1,3135 -1,0 +1,3142 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CopyHelper;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.DeltaSerializationException;
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.InvalidDeltaException;
+import com.gemstone.gemfire.SerializationException;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.EntryOperation;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.SerializedCacheValue;
+import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.cache.query.IndexMaintenanceException;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
+import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
+import com.gemstone.gemfire.cache.query.internal.index.IndexProtocol;
+import com.gemstone.gemfire.cache.query.internal.index.IndexUtils;
+import com.gemstone.gemfire.cache.util.TimestampedEntryEvent;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.ByteArrayDataInput;
+import com.gemstone.gemfire.internal.DSFIDFactory;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.Sendable;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
+import com.gemstone.gemfire.internal.cache.delta.Delta;
+import com.gemstone.gemfire.internal.cache.lru.Sizeable;
+import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.PutMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tx.DistTxKeyInfo;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackArgument;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.lang.StringUtils;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.Chunk;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
+import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
+import com.gemstone.gemfire.internal.offheap.Releasable;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
+
+import com.gemstone.gemfire.internal.util.ArrayUtils;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
+
+/**
+ * Implementation of an entry event
+ */
+// must be public for DataSerializableFixedID
+public class EntryEventImpl
+ implements EntryEvent, InternalCacheEvent, DataSerializableFixedID, EntryOperation
+ , Releasable
+{
+ private static final Logger logger = LogService.getLogger();
+
+ // PACKAGE FIELDS //
+ public transient LocalRegion region;
+ private transient RegionEntry re;
+
+ protected KeyInfo keyInfo;
+
+ //private long eventId;
+ /** the event's id. Scoped by distributedMember. */
+ protected EventID eventID;
+
+ private Object newValue = null;
+ /**
+ * If we ever serialize the new value then it should be
+ * stored in this field in case we need the serialized form
+ * again later. This was added to fix bug 43781.
+ * Note that we also have the "newValueBytes" field.
+ * But it is only non-null if setSerializedNewValue was called.
+ */
+ private byte[] cachedSerializedNewValue = null;
+ @Retained(ENTRY_EVENT_OLD_VALUE)
+ private Object oldValue = null;
+ protected Delta delta = null;
+
+ protected short eventFlags = 0x0000;
+
+ protected TXId txId = null;
+
+ protected Operation op;
+
+ /* To store the operation/modification type */
+ private transient EnumListenerEvent eventType;
+
+ /**
+ * This field will be null unless this event is used for a putAll operation.
+ *
+ * @since 5.0
+ */
+ protected transient DistributedPutAllOperation putAllOp;
+
+ /**
+ * This field will be null unless this event is used for a removeAll operation.
+ *
+ * @since 8.1
+ */
+ protected transient DistributedRemoveAllOperation removeAllOp;
+
+ /**
+ * The member that originated this event
+ *
+ * @since 5.0
+ */
+ protected DistributedMember distributedMember;
+
+
+ /**
+ * transient storage for the message that caused the event
+ */
+ transient DistributionMessage causedByMessage;
+
+
+ //private static long eventID = 0;
+
+ /**
+ * The originating membershipId of this event.
+ *
+ * @since 5.1
+ */
+ protected ClientProxyMembershipID context = null;
+
+ /**
+ * A custom context object that can be used for any other contextual
+ * information. Currently used by SQL Fabric to pass around evaluated rows
+ * from raw byte arrays and routing object.
+ */
+ private transient Object contextObj = null;
+
+ /**
+ * this holds the bytes representing the change in value effected by this
+ * event. It is used when the value implements the Delta interface.
+ */
+ private byte[] deltaBytes = null;
+
+
+ /** routing information for cache clients for this event */
+ private FilterInfo filterInfo;
+
+ /**new value stored in serialized form*/
+ protected byte[] newValueBytes;
+ /**old value stored in serialized form*/
+ private byte[] oldValueBytes;
+
+ /** version tag for concurrency checks */
+ protected VersionTag versionTag;
+
+ /** boolean to indicate that this operation should be optimized by not fetching from HDFS*/
+ private transient boolean fetchFromHDFS = true;
+
+ private transient boolean isPutDML = false;
+
+ /** boolean to indicate that the RegionEntry for this event was loaded from HDFS*/
+ private transient boolean loadedFromHDFS= false;
+
+ private transient boolean isCustomEviction = false;
+
+ /** boolean to indicate that the RegionEntry for this event has been evicted*/
+ private transient boolean isEvicted = false;
+
+ private transient boolean isPendingSecondaryExpireDestroy = false;
+
+ public final static Object SUSPECT_TOKEN = new Object();
+
+ public EntryEventImpl() {
+ }
+
+ /**
+ * create a new entry event that will be used for conveying version information
+ * and anything else of use while processing another event
+ * @return the empty event object
+ */
+ @Retained
+ public static EntryEventImpl createVersionTagHolder() {
+ return createVersionTagHolder(null);
+ }
+
+ /**
+ * create a new entry event that will be used for conveying version information
+ * and anything else of use while processing another event
+ * @return the empty event object
+ */
+ @Retained
+ public static EntryEventImpl createVersionTagHolder(VersionTag tag) {
+ EntryEventImpl result = new EntryEventImpl();
+ result.setVersionTag(tag);
+ result.disallowOffHeapValues();
+ return result;
+ }
+
+ /**
+ * Reads the contents of this message from the given input.
+ */
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ this.eventID = (EventID)DataSerializer.readObject(in);
+ Object key = DataSerializer.readObject(in);
+ Object value = DataSerializer.readObject(in);
+ this.keyInfo = new KeyInfo(key, value, null);
+ this.op = Operation.fromOrdinal(in.readByte());
+ this.eventFlags = in.readShort();
+ this.keyInfo.setCallbackArg(DataSerializer.readObject(in));
+ this.txId = (TXId)DataSerializer.readObject(in);
+
+ if (in.readBoolean()) { // isDelta
+ this.delta = (Delta)DataSerializer.readObject(in);
+ }
+ else {
+ // OFFHEAP Currently values are never deserialized to off heap memory. If that changes then this code needs to change.
+ if (in.readBoolean()) { // newValueSerialized
+ this.newValueBytes = DataSerializer.readByteArray(in);
+ this.cachedSerializedNewValue = this.newValueBytes;
+ this.newValue = CachedDeserializableFactory.create(this.newValueBytes);
+ }
+ else {
+ this.newValue = DataSerializer.readObject(in);
+ }
+ }
+
+ // OFFHEAP Currently values are never deserialized to off heap memory. If that changes then this code needs to change.
+ if (in.readBoolean()) { // oldValueSerialized
+ this.oldValueBytes = DataSerializer.readByteArray(in);
+ this.oldValue = CachedDeserializableFactory.create(this.oldValueBytes);
+ }
+ else {
+ this.oldValue = DataSerializer.readObject(in);
+ }
+ this.distributedMember = DSFIDFactory.readInternalDistributedMember(in);
+ this.context = ClientProxyMembershipID.readCanonicalized(in);
+ this.tailKey = DataSerializer.readLong(in);
+ }
+
+ @Retained
+ protected EntryEventImpl(LocalRegion region, Operation op, Object key,
+ boolean originRemote, DistributedMember distributedMember,
+ boolean generateCallbacks, boolean fromRILocalDestroy) {
+ this.region = region;
+ this.op = op;
+ this.keyInfo = this.region.getKeyInfo(key);
+ setOriginRemote(originRemote);
+ setGenerateCallbacks(generateCallbacks);
+ this.distributedMember = distributedMember;
+ setFromRILocalDestroy(fromRILocalDestroy);
+ }
+
+ /**
+ * Doesn't specify oldValue as this will be filled in later as part of an
+ * operation on the region, or lets it default to null.
+ */
+ @Retained
+ protected EntryEventImpl(
+ final LocalRegion region,
+ Operation op, Object key, @Retained(ENTRY_EVENT_NEW_VALUE) Object newVal,
+ Object callbackArgument,
+ boolean originRemote, DistributedMember distributedMember,
+ boolean generateCallbacks, boolean initializeId) {
+
+ this.region = region;
+ this.op = op;
+ this.keyInfo = this.region.getKeyInfo(key, newVal, callbackArgument);
+
+ if (newVal instanceof Delta) {
+ this.delta = (Delta)newVal;
+ }
+ else if (!Token.isInvalid(newVal)) {
+ basicSetNewValue(newVal);
+ }
+
+ this.txId = this.region.getTXId();
+ /**
+ * this might set txId for events done from a thread that has a tx even
+ * though the op is non-tx. For example region ops.
+ */
+ if (newVal == Token.LOCAL_INVALID) {
+ setLocalInvalid(true);
+ }
+ setOriginRemote(originRemote);
+ setGenerateCallbacks(generateCallbacks);
+ this.distributedMember = distributedMember;
+ }
+
+ /**
+ * Called by BridgeEntryEventImpl to use existing EventID
+ */
+ @Retained
+ protected EntryEventImpl(LocalRegion region, Operation op, Object key,
+ @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument, boolean originRemote,
+ DistributedMember distributedMember, boolean generateCallbacks,
+ EventID eventID) {
+ this(region, op, key, newValue,
+ callbackArgument, originRemote, distributedMember, generateCallbacks,
+ true /* initializeId */);
+ Assert.assertTrue(eventID != null || !(region instanceof PartitionedRegion));
+ this.setEventId(eventID);
+ }
+
+ /**
+ * create an entry event from another entry event
+ */
+ @Retained
+ public EntryEventImpl(@Retained({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE}) EntryEventImpl other) {
+ this(other, true);
+ }
+
+ @Retained
+ public EntryEventImpl(@Retained({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE}) EntryEventImpl other, boolean setOldValue) {
+ region = other.region;
+
+ this.eventID = other.eventID;
+ basicSetNewValue(other.basicGetNewValue());
+ this.newValueBytes = other.newValueBytes;
+ this.cachedSerializedNewValue = other.cachedSerializedNewValue;
+ this.re = other.re;
+ this.delta = other.delta;
+ if (setOldValue) {
+ retainAndSetOldValue(other.basicGetOldValue());
+ this.oldValueBytes = other.oldValueBytes;
+ }
+ this.eventFlags = other.eventFlags;
+ setEventFlag(EventFlags.FLAG_CALLBACKS_INVOKED, false);
+ txId = other.txId;
+ op = other.op;
+ distributedMember = other.distributedMember;
+ this.filterInfo = other.filterInfo;
+ this.keyInfo = other.keyInfo.isDistKeyInfo() ? new DistTxKeyInfo(
+ (DistTxKeyInfo) other.keyInfo) : new KeyInfo(other.keyInfo);
+ if (other.getRawCallbackArgument() instanceof GatewaySenderEventCallbackArgument) {
+ this.keyInfo
+ .setCallbackArg((new GatewaySenderEventCallbackArgument(
+ (GatewaySenderEventCallbackArgument) other
+ .getRawCallbackArgument())));
+ }
+ this.context = other.context;
+ this.deltaBytes = other.deltaBytes;
+ this.tailKey = other.tailKey;
+ this.versionTag = other.versionTag;
+ //set possible duplicate
+ this.setPossibleDuplicate(other.isPossibleDuplicate());
+ }
+
+ @Retained
+ public EntryEventImpl(Object key2) {
+ this.keyInfo = new KeyInfo(key2, null, null);
+ }
+
+ /**
+ * This constructor is used to create a bridge event in server-side
+ * command classes. Events created with this are not intended to be
+ * used in cache operations.
+ * @param id the identity of the client's event
+ */
+ @Retained
+ public EntryEventImpl(EventID id) {
+ this.eventID = id;
+ this.offHeapOk = false;
+ }
+
+ /**
+ * Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the
+ * EntryEventImpl if the region parameter is a PartitionedRegion.
+ */
+ @Retained
+ public static EntryEventImpl create(LocalRegion region,
+ Operation op,
+ Object key, @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument,
+ boolean originRemote, DistributedMember distributedMember) {
+ return create(region,op,key,newValue,callbackArgument,originRemote,distributedMember,true,true);
+ }
+
+ /**
+ * Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the
+ * EntryEventImpl if the region parameter is a PartitionedRegion.
+ */
+ @Retained
+ public static EntryEventImpl create(LocalRegion region,
+ Operation op,
+ Object key,
+ @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue,
+ Object callbackArgument,
+ boolean originRemote,
+ DistributedMember distributedMember,
+ boolean generateCallbacks) {
+ return create(region, op, key, newValue, callbackArgument, originRemote,
+ distributedMember, generateCallbacks,true);
+ }
+
+ /**
+ * Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the
+ * EntryEventImpl if the region parameter is a PartitionedRegion.
+ *
+ * Called by BridgeEntryEventImpl to use existing EventID
+ *
+ * {@link EntryEventImpl#EntryEventImpl(LocalRegion, Operation, Object, Object, Object, boolean, DistributedMember, boolean, EventID)}
+ */
+ @Retained
+ public static EntryEventImpl create(LocalRegion region, Operation op, Object key,
+ @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument, boolean originRemote,
+ DistributedMember distributedMember, boolean generateCallbacks,
+ EventID eventID) {
+ EntryEventImpl entryEvent = new EntryEventImpl(region,op,key,newValue,callbackArgument,originRemote,distributedMember,generateCallbacks,eventID);
+ return entryEvent;
+ }
+
+ /**
+ * Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the
+ * EntryEventImpl if the region parameter is a PartitionedRegion.
+ *
+ * {@link EntryEventImpl#EntryEventImpl(LocalRegion, Operation, Object, boolean, DistributedMember, boolean, boolean)}
+ */
+ @Retained
+ public static EntryEventImpl create(LocalRegion region, Operation op, Object key,
+ boolean originRemote, DistributedMember distributedMember,
+ boolean generateCallbacks, boolean fromRILocalDestroy) {
+ EntryEventImpl entryEvent = new EntryEventImpl(region,op,key,originRemote,distributedMember,generateCallbacks,fromRILocalDestroy);
+ return entryEvent;
+ }
+
+ /**
+ * Creates and returns an EntryEventImpl. Generates and assigns a bucket id to the
+ * EntryEventImpl if the region parameter is a PartitionedRegion.
+ *
+ * This creator does not specify the oldValue as this will be filled in later as part of an
+ * operation on the region, or lets it default to null.
+ *
+ * {@link EntryEventImpl#EntryEventImpl(LocalRegion, Operation, Object, Object, Object, boolean, DistributedMember, boolean, boolean)}
+ */
+ @Retained
+ public static EntryEventImpl create(final LocalRegion region,
+ Operation op, Object key, @Retained(ENTRY_EVENT_NEW_VALUE) Object newVal,
+ Object callbackArgument,
+ boolean originRemote, DistributedMember distributedMember,
+ boolean generateCallbacks, boolean initializeId) {
+ EntryEventImpl entryEvent = new EntryEventImpl(region,op,key,newVal,callbackArgument,
+ originRemote,distributedMember,generateCallbacks,initializeId);
+ return entryEvent;
+ }
+
+ /**
+ * Creates a PutAllEvent given the distributed operation, the region, and the
+ * entry data.
+ *
+ * @since 5.0
+ */
+ @Retained
+ static EntryEventImpl createPutAllEvent(
+ DistributedPutAllOperation putAllOp, LocalRegion region,
+ Operation entryOp, Object entryKey, @Retained(ENTRY_EVENT_NEW_VALUE) Object entryNewValue)
+ {
+ EntryEventImpl e;
+ if (putAllOp != null) {
+ EntryEventImpl event = putAllOp.getBaseEvent();
+ if (event.isBridgeEvent()) {
+ e = EntryEventImpl.create(region, entryOp, entryKey, entryNewValue,
+ event.getRawCallbackArgument(), false, event.distributedMember,
+ event.isGenerateCallbacks());
+ e.setContext(event.getContext());
+ } else {
+ e = EntryEventImpl.create(region, entryOp, entryKey, entryNewValue, event.getCallbackArgument(),
+ false, region.getMyId(), event.isGenerateCallbacks());
+ }
+
+ } else {
+ e = EntryEventImpl.create(region, entryOp, entryKey, entryNewValue, null,
+ false, region.getMyId(), true);
+ }
+
+ e.putAllOp = putAllOp;
+ return e;
+ }
+
+ protected static EntryEventImpl createRemoveAllEvent(
+ DistributedRemoveAllOperation op,
+ LocalRegion region,
+ Object entryKey) {
+ EntryEventImpl e;
+ final Operation entryOp = Operation.REMOVEALL_DESTROY;
+ if (op != null) {
+ EntryEventImpl event = op.getBaseEvent();
+ if (event.isBridgeEvent()) {
+ e = EntryEventImpl.create(region, entryOp, entryKey, null,
+ event.getRawCallbackArgument(), false, event.distributedMember,
+ event.isGenerateCallbacks());
+ e.setContext(event.getContext());
+ } else {
+ e = EntryEventImpl.create(region, entryOp, entryKey, null, event.getCallbackArgument(),
+ false, region.getMyId(), event.isGenerateCallbacks());
+ }
+
+ } else {
+ e = EntryEventImpl.create(region, entryOp, entryKey, null, null,
+ false, region.getMyId(), true);
+ }
+
+ e.removeAllOp = op;
+ return e;
+ }
+ public boolean isBulkOpInProgress() {
+ return getPutAllOperation() != null || getRemoveAllOperation() != null;
+ }
+
+ /** return the putAll operation for this event, if any */
+ public DistributedPutAllOperation getPutAllOperation() {
+ return this.putAllOp;
+ }
+ public DistributedPutAllOperation setPutAllOperation(DistributedPutAllOperation nv) {
+ DistributedPutAllOperation result = this.putAllOp;
+ if (nv != null && nv.getBaseEvent() != null) {
+ setCallbackArgument(nv.getBaseEvent().getCallbackArgument());
+ }
+ this.putAllOp = nv;
+ return result;
+ }
+ public DistributedRemoveAllOperation getRemoveAllOperation() {
+ return this.removeAllOp;
+ }
+ public DistributedRemoveAllOperation setRemoveAllOperation(DistributedRemoveAllOperation nv) {
+ DistributedRemoveAllOperation result = this.removeAllOp;
+ if (nv != null && nv.getBaseEvent() != null) {
+ setCallbackArgument(nv.getBaseEvent().getCallbackArgument());
+ }
+ this.removeAllOp = nv;
+ return result;
+ }
+
+ private final boolean testEventFlag(short mask)
+ {
+ return EventFlags.isSet(this.eventFlags, mask);
+ }
+
+ private final void setEventFlag(short mask, boolean on)
+ {
+ this.eventFlags = EventFlags.set(this.eventFlags, mask, on);
+ }
+
+ public DistributedMember getDistributedMember()
+ {
+ return this.distributedMember;
+ }
+
+ /////////////////////// INTERNAL BOOLEAN SETTERS
+ public void setOriginRemote(boolean b)
+ {
+ setEventFlag(EventFlags.FLAG_ORIGIN_REMOTE, b);
+ }
+
+ public void setLocalInvalid(boolean b)
+ {
+ setEventFlag(EventFlags.FLAG_LOCAL_INVALID, b);
+ }
+
+ void setGenerateCallbacks(boolean b)
+ {
+ setEventFlag(EventFlags.FLAG_GENERATE_CALLBACKS, b);
+ }
+
+ /** set the the flag telling whether callbacks should be invoked for a partitioned region */
+ public void setInvokePRCallbacks(boolean b) {
+ setEventFlag(EventFlags.FLAG_INVOKE_PR_CALLBACKS, b);
+ }
+
+ /** get the flag telling whether callbacks should be invoked for a partitioned region */
+ public boolean getInvokePRCallbacks() {
+ return testEventFlag(EventFlags.FLAG_INVOKE_PR_CALLBACKS);
+ }
+
+ public boolean getInhibitDistribution() {
+ return testEventFlag(EventFlags.FLAG_INHIBIT_DISTRIBUTION);
+ }
+
+ public void setInhibitDistribution(boolean b) {
+ setEventFlag(EventFlags.FLAG_INHIBIT_DISTRIBUTION, b);
+ }
+
+ /** was the entry destroyed or missing and allowed to be destroyed again? */
+ public boolean getIsRedestroyedEntry() {
+ return testEventFlag(EventFlags.FLAG_REDESTROYED_TOMBSTONE);
+ }
+
+ public void setIsRedestroyedEntry(boolean b) {
+ setEventFlag(EventFlags.FLAG_REDESTROYED_TOMBSTONE, b);
+ }
+
+ public void isConcurrencyConflict(boolean b) {
+ setEventFlag(EventFlags.FLAG_CONCURRENCY_CONFLICT, b);
+ }
+
+ public boolean isConcurrencyConflict() {
+ return testEventFlag(EventFlags.FLAG_CONCURRENCY_CONFLICT);
+ }
+
+ /** set the DistributionMessage that caused this event */
+ public void setCausedByMessage(DistributionMessage msg) {
+ this.causedByMessage = msg;
+ }
+
+ /**
+ * get the PartitionMessage that caused this event, or null if
+ * the event was not caused by a PartitionMessage
+ */
+ public PartitionMessage getPartitionMessage() {
+ if (this.causedByMessage != null && this.causedByMessage instanceof PartitionMessage) {
+ return (PartitionMessage)this.causedByMessage;
+ }
+ return null;
+ }
+
+ /**
+ * get the RemoteOperationMessage that caused this event, or null if
+ * the event was not caused by a RemoteOperationMessage
+ */
+ public RemoteOperationMessage getRemoteOperationMessage() {
+ if (this.causedByMessage != null && this.causedByMessage instanceof RemoteOperationMessage) {
+ return (RemoteOperationMessage)this.causedByMessage;
+ }
+ return null;
+ }
+
+ /////////////// BOOLEAN GETTERS
+ public boolean isLocalLoad()
+ {
+ return this.op.isLocalLoad();
+ }
+
+ public boolean isNetSearch()
+ {
+ return this.op.isNetSearch();
+ }
+
+ public boolean isNetLoad()
+ {
+ return this.op.isNetLoad();
+ }
+
+ public boolean isDistributed()
+ {
+ return this.op.isDistributed();
+ }
+
+ public boolean isExpiration()
+ {
+ return this.op.isExpiration();
+ }
+
+ public boolean isEviction() {
+ return this.op.isEviction();
+ }
+
+ public final boolean isCustomEviction() {
+ return this.isCustomEviction;
+ }
+
+ public final void setCustomEviction(boolean customEvict) {
+ this.isCustomEviction = customEvict;
+ }
+
+ public final void setEvicted() {
+ this.isEvicted = true;
+ }
+
+ public final boolean isEvicted() {
+ return this.isEvicted;
+ }
+
+ public final boolean isPendingSecondaryExpireDestroy() {
+ return this.isPendingSecondaryExpireDestroy;
+ }
+
+ public final void setPendingSecondaryExpireDestroy (boolean value) {
+ this.isPendingSecondaryExpireDestroy = value;
+ }
+ // Note that isOriginRemote is sometimes set to false even though the event
+ // was received from a peer. This is done to force distribution of the
+ // message to peers and to cause concurrency version stamping to be performed.
+ // This is done by all one-hop operations, like RemoteInvalidateMessage.
+ public boolean isOriginRemote()
+ {
+ return testEventFlag(EventFlags.FLAG_ORIGIN_REMOTE);
+ }
+
+ /* return whether this event originated from a WAN gateway and carries a WAN version tag */
+ public boolean isFromWANAndVersioned() {
+ return (this.versionTag != null && this.versionTag.isGatewayTag());
+ }
+
+ /* return whether this event originated in a client and carries a version tag */
+ public boolean isFromBridgeAndVersioned() {
+ return (this.context != null) && (this.versionTag != null);
+ }
+
+ public boolean isGenerateCallbacks()
+ {
+ return testEventFlag(EventFlags.FLAG_GENERATE_CALLBACKS);
+ }
+
+ public void setNewEventId(DistributedSystem sys) {
+ Assert.assertTrue(this.eventID == null, "Double setting event id");
+ EventID newID = new EventID(sys);
+ if (this.eventID != null) {
+ if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+ logger.trace(LogMarker.BRIDGE_SERVER, "Replacing event ID with {} in event {}", newID, this);
+ }
+ }
+ this.eventID = newID;
+ }
+
+ public void reserveNewEventId(DistributedSystem sys, int count) {
+ Assert.assertTrue(this.eventID == null, "Double setting event id");
+ this.eventID = new EventID(sys);
+ if (count > 1) {
+ this.eventID.reserveSequenceId(count-1);
+ }
+ }
+
+ public void setEventId(EventID id)
+ {
+ this.eventID = id;
+ }
+
+ /**
+ * Return the event id, if any
+ * @return null if no event id has been set
+ */
+ public final EventID getEventId() {
+ return this.eventID;
+ }
+
+ public boolean isBridgeEvent() {
+ return hasClientOrigin();
+ }
+ public boolean hasClientOrigin() {
+ return getContext() != null;
+ }
+
+ /**
+ * sets the ID of the client that initiated this event
+ */
+ public void setContext(ClientProxyMembershipID contx) {
+ Assert.assertTrue(contx != null);
+ this.context = contx;
+ }
+
+ /**
+ * gets the ID of the client that initiated this event. Null if a server-initiated event
+ */
+ public ClientProxyMembershipID getContext()
+ {
+ return this.context;
+ }
+
+ // INTERNAL
+ boolean isLocalInvalid()
+ {
+ return testEventFlag(EventFlags.FLAG_LOCAL_INVALID);
+ }
+
+ /////////////////////////////////////////////////
+
+ /**
+ * Returns the key.
+ *
+ * @return the key.
+ */
+ public Object getKey()
+ {
+ return keyInfo.getKey();
+ }
+
+ /**
+ * Returns the value in the cache prior to this event. When passed to an event
+ * handler after an event occurs, this value reflects the value that was in
+ * the cache in this VM, not necessarily the value that was in the cache VM
+ * that initiated the operation.
+ *
+ * @return the value in the cache prior to this event.
+ */
+ public final Object getOldValue() {
+ try {
+ if (isOriginRemote() && this.region.isProxy()) {
+ return null;
+ }
+ @Unretained Object ov = basicGetOldValue();
+ if (ov == null) {
+ return null;
+ } else if (ov == Token.NOT_AVAILABLE) {
+ return AbstractRegion.handleNotAvailable(ov);
+ }
+ boolean doCopyOnRead = getRegion().isCopyOnRead();
+ if (ov != null) {
+ if (ov instanceof StoredObject) {
+ // TODO OFFHEAP: returns off-heap PdxInstance
+ return ((StoredObject) ov).getValueAsDeserializedHeapObject();
+ } else
+ if (ov instanceof CachedDeserializable) {
+ CachedDeserializable cd = (CachedDeserializable)ov;
+ if (doCopyOnRead) {
+ return cd.getDeserializedWritableCopy(this.region, this.re);
+ } else {
+ return cd.getDeserializedValue(this.region, this.re);
+ }
+ }
+ else {
+ if (doCopyOnRead) {
+ return CopyHelper.copy(ov);
+ } else {
+ return ov;
+ }
+ }
+ }
+ return null;
+ } catch(IllegalArgumentException i) {
+ IllegalArgumentException iae = new IllegalArgumentException(LocalizedStrings.DONT_RELEASE.toLocalizedString("Error while deserializing value for key="+getKey()));
+ iae.initCause(i);
+ throw iae;
+ }
+ }
+
+ /**
+ * Like getRawNewValue except that if the result is an off-heap reference then copy it to the heap.
+ * ALERT: If there is a Delta, returns that, not the (applied) new value.
+ * TODO OFFHEAP: to prevent the heap copy use getRawNewValue instead
+ */
+ public final Object getRawNewValueAsHeapObject() {
+ if (this.delta != null) {
+ return this.delta;
+ }
+ return OffHeapHelper.getHeapForm(OffHeapHelper.copyIfNeeded(basicGetNewValue()));
+ }
+
+ /**
+ * If new value is a Delta return it.
+ * Else if new value is off-heap return the StoredObject form (unretained OFF_HEAP_REFERENCE).
+ * Its refcount is not inced by this call and the returned object can only be safely used for the lifetime of the EntryEventImpl instance that returned the value.
+ * Else return the raw form.
+ */
+ @Unretained(ENTRY_EVENT_NEW_VALUE)
+ public final Object getRawNewValue() {
+ if (this.delta != null) return this.delta;
+ return basicGetNewValue();
+ }
+
+ @Unretained(ENTRY_EVENT_NEW_VALUE)
+ public Object getValue() {
+ return basicGetNewValue();
+ }
+
+ /**
+ * Returns the delta that represents the new value; null if no delta.
+ * @return the delta that represents the new value; null if no delta.
+ */
+ public final Delta getDeltaNewValue() {
+ return this.delta;
+ }
+
+ /**
+ * Applies the delta
+ */
+ private Object applyDeltaWithCopyOnRead(boolean doCopyOnRead) {
+ //try {
+ if (applyDelta(true)) {
+ Object applied = basicGetNewValue();
+ // if applyDelta returns true then newValue should not be off-heap
+ assert !(applied instanceof StoredObject);
+ if (applied == this.oldValue && doCopyOnRead) {
+ applied = CopyHelper.copy(applied);
+ }
+ return applied;
+ }
+ //} catch (EntryNotFoundException ex) {
+ // only (broken) product code has the opportunity to call this before
+ // this.oldValue is set. If oldValue is not set yet, then
+ // we most likely haven't synchronized on the region entry yet.
+ // (If we have, then make sure oldValue is set before
+ // calling this method).
+ //throw new AssertionError("too early to call getNewValue");
+ //}
+ return null;
+ }
+
+ @Released(ENTRY_EVENT_NEW_VALUE)
+ protected void basicSetNewValue(@Retained(ENTRY_EVENT_NEW_VALUE) Object v) {
+ if (v == this.newValue) return;
+ if (this.offHeapOk) {
+ OffHeapHelper.releaseAndTrackOwner(this.newValue, this);
+ }
+ if (v instanceof Chunk) {
+ ReferenceCountHelper.setReferenceCountOwner(this);
+ if (!((Chunk) v).retain()) {
+ ReferenceCountHelper.setReferenceCountOwner(null);
+ this.newValue = null;
+ return;
+ }
+ ReferenceCountHelper.setReferenceCountOwner(null);
+ }
+ this.newValue = v;
+ this.cachedSerializedNewValue = null;
+ }
+ /**
+ * Returns true if this event has a reference to an off-heap new or old value.
+ */
+ public boolean hasOffHeapValue() {
+ return (this.newValue instanceof Chunk) || (this.oldValue instanceof Chunk);
+ }
+
+ @Unretained
+ protected final Object basicGetNewValue() {
+ Object result = this.newValue;
+ if (!this.offHeapOk && result instanceof Chunk) {
+ //this.region.getCache().getLogger().info("DEBUG new value already freed " + System.identityHashCode(result));
+ throw new IllegalStateException("Attempt to access off heap value after the EntryEvent was released.");
+ }
+ return result;
+ }
+
+ private class OldValueOwner {
+ private EntryEventImpl getEvent() {
+ return EntryEventImpl.this;
+ }
+ @Override
+ public int hashCode() {
+ return getEvent().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof OldValueOwner) {
+ return getEvent().equals(((OldValueOwner) obj).getEvent());
+ } else {
+ return false;
+ }
+ }
+ @Override
+ public String toString() {
+ return "OldValueOwner " + getEvent().toString();
+ }
+ }
+
+ /**
+ * Note if v might be an off-heap reference that you did not retain for this EntryEventImpl
+ * then call retainsAndSetOldValue instead of this method.
+ * @param v the caller should have already retained this off-heap reference.
+ */
+ @Released(ENTRY_EVENT_OLD_VALUE)
+ private void basicSetOldValue(@Unretained(ENTRY_EVENT_OLD_VALUE) Object v) {
+ @Released final Object curOldValue = this.oldValue;
+ if (v == curOldValue) return;
+ if (this.offHeapOk) {
+ if (curOldValue instanceof Chunk) {
+ if (ReferenceCountHelper.trackReferenceCounts()) {
+ OffHeapHelper.releaseAndTrackOwner(curOldValue, new OldValueOwner());
+ } else {
+ OffHeapHelper.release(curOldValue);
+ }
+ }
+ }
+
+ this.oldValue = v;
+ }
+
+ @Released(ENTRY_EVENT_OLD_VALUE)
+ private void retainAndSetOldValue(@Retained(ENTRY_EVENT_OLD_VALUE) Object v) {
+ if (v == this.oldValue) return;
+
+ if (v instanceof Chunk) {
+ if (ReferenceCountHelper.trackReferenceCounts()) {
+ ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
+ boolean couldNotRetain = (!((Chunk) v).retain());
+ ReferenceCountHelper.setReferenceCountOwner(null);
+ if (couldNotRetain) {
+ this.oldValue = null;
+ return;
+ }
+ } else {
+ if (!((Chunk) v).retain()) {
+ this.oldValue = null;
+ return;
+ }
+ }
+ }
+ basicSetOldValue(v);
+ }
+
+ @Unretained(ENTRY_EVENT_OLD_VALUE)
+ private Object basicGetOldValue() {
+ @Unretained(ENTRY_EVENT_OLD_VALUE)
+ Object result = this.oldValue;
+ if (!this.offHeapOk && result instanceof Chunk) {
+ //this.region.getCache().getLogger().info("DEBUG old value already freed " + System.identityHashCode(result));
+ throw new IllegalStateException("Attempt to access off heap value after the EntryEvent was released.");
+ }
+ return result;
+ }
+
+ /**
+ * Like getRawOldValue except that if the result is an off-heap reference then copy it to the heap.
+ * To avoid the heap copy use getRawOldValue instead.
+ */
+ public final Object getRawOldValueAsHeapObject() {
+ return OffHeapHelper.getHeapForm(OffHeapHelper.copyIfNeeded(basicGetOldValue()));
+ }
+ /*
+ * If the old value is off-heap return the StoredObject form (unretained OFF_HEAP_REFERENCE).
+ * Its refcount is not inced by this call and the returned object can only be safely used for the lifetime of the EntryEventImpl instance that returned the value.
+ * Else return the raw form.
+ */
+ @Unretained
+ public final Object getRawOldValue() {
+ return basicGetOldValue();
+ }
+ /**
+ * Just like getRawOldValue except if the raw old value is off-heap deserialize it.
+ * Note that in some cases sqlf ignores the request to deserialize.
+ */
+ @Unretained(ENTRY_EVENT_OLD_VALUE)
+ public final Object getOldValueAsOffHeapDeserializedOrRaw() {
+ Object result = basicGetOldValue();
+ if (result instanceof StoredObject) {
+ result = ((StoredObject) result).getDeserializedForReading();
+ }
+ return AbstractRegion.handleNotAvailable(result); // fixes 49499
+ }
+
+ /**
+ * Added this function to expose isCopyOnRead function to the
+ * child classes of EntryEventImpl
+ *
+ */
+ protected boolean isRegionCopyOnRead() {
+ return getRegion().isCopyOnRead();
+ }
+
+ /**
+ * Returns the value in the cache after this event.
+ *
+ * @return the value in the cache after this event.
+ */
+ public final Object getNewValue() {
+
+ boolean doCopyOnRead = getRegion().isCopyOnRead();
+ try {
+ if (applyDelta(true)) {
+ @Unretained(ENTRY_EVENT_NEW_VALUE)
+ Object applied = basicGetNewValue();
+ if (applied == this.oldValue && doCopyOnRead) {
+ applied = CopyHelper.copy(applied);
+ }
+ return applied;
+ }
+ } catch (EntryNotFoundException ex) {
+ // only (broken) product code has the opportunity to call this before
+ // this.oldValue is set. If oldValue is not set yet, then
+ // we most likely haven't synchronized on the region entry yet.
+ // (If we have, then make sure oldValue is set before
+ // calling this method).
+ throw new AssertionError("too early to call getNewValue");
+ }
+ Object nv = basicGetNewValue();
+ if (nv != null) {
+ if (nv == Token.NOT_AVAILABLE) {
+ // I'm not sure this can even happen
+ return AbstractRegion.handleNotAvailable(nv);
+ }
+ if (nv instanceof StoredObject) {
+ // TODO OFFHEAP currently we copy offheap new value to the heap here. Check callers of this method to see if they can be optimized to use offheap values.
+ // TODO OFFHEAP: returns off-heap PdxInstance
+ return ((StoredObject) nv).getValueAsDeserializedHeapObject();
+ } else
+ if (nv instanceof CachedDeserializable) {
+ CachedDeserializable cd = (CachedDeserializable)nv;
+ Object v = null;
+ if (doCopyOnRead) {
+ v = cd.getDeserializedWritableCopy(this.region, this.re);
+ } else {
+ v = cd.getDeserializedValue(this.region, this.re);
+ }
+ assert !(v instanceof CachedDeserializable) : "for key "+this.getKey()+" found nested CachedDeserializable";
+ return v;
+ }
+ else {
+ if (doCopyOnRead) {
+ return CopyHelper.copy(nv);
+ } else {
+ return nv;
+ }
+ }
+ }
+ return null;
+ }
+
+ public final String getNewValueStringForm() {
+ return StringUtils.forceToString(basicGetNewValue());
+ }
+ public final String getOldValueStringForm() {
+ return StringUtils.forceToString(basicGetOldValue());
+ }
+
+ protected boolean applyDelta(boolean throwOnNullOldValue)
+ throws EntryNotFoundException {
+ if (this.newValue != null || this.delta == null) {
+ return false;
+ }
+ if (this.oldValue == null) {
+ if (throwOnNullOldValue) {
+ // !!!:ezoerner:20080611 It would be nice if the client got this
+ // exception
+ throw new EntryNotFoundException(
+ "Cannot apply a delta without an existing value");
+ }
+ return false;
+ }
+ // swizzle BucketRegion in event for Delta.
+ // !!!:ezoerner:20090602 this is way ugly; this whole class severely
+ // needs refactoring
+ LocalRegion originalRegion = this.region;
+ try {
+ if (originalRegion instanceof BucketRegion) {
+ this.region = ((BucketRegion)this.region).getPartitionedRegion();
+ }
+ basicSetNewValue(this.delta.apply(this));
+ } finally {
+ this.region = originalRegion;
+ }
+ return true;
+ }
+
+ /** Set a deserialized value */
+ public final void setNewValue(@Retained(ENTRY_EVENT_NEW_VALUE) Object obj) {
+ if (obj instanceof Delta) {
+ this.delta = (Delta)obj;
+ basicSetNewValue(null);
+ }
+ else {
+ basicSetNewValue(obj);
+ }
+ }
+
+ public TransactionId getTransactionId()
+ {
+ return this.txId;
+ }
+
+ public void setTransactionId(TransactionId txId)
+ {
+ this.txId = (TXId)txId;
+ }
+
+ /**
+ * Answer true if this event resulted from a loader.
+ *
+ * @return true if isLocalLoad or isNetLoad
+ */
+ public boolean isLoad()
+ {
+ return this.op.isLoad();
+ }
+
+ public void setRegion(LocalRegion r)
+ {
+ this.region = r;
+ }
+
+ /**
+ * @see com.gemstone.gemfire.cache.CacheEvent#getRegion()
+ */
+ public final LocalRegion getRegion() {
+ return region;
+ }
+
+ public Operation getOperation()
+ {
+ return this.op;
+ }
+
+ public void setOperation(Operation op)
+ {
+ this.op = op;
+ PartitionMessage prm = getPartitionMessage();
+ if (prm != null) {
+ prm.setOperation(this.op);
+ }
+ }
+
+ /**
+ * @see com.gemstone.gemfire.cache.CacheEvent#getCallbackArgument()
+ */
+ public Object getCallbackArgument()
+ {
+ Object result = this.keyInfo.getCallbackArg();
+ while (result instanceof WrappedCallbackArgument) {
+ WrappedCallbackArgument wca = (WrappedCallbackArgument)result;
+ result = wca.getOriginalCallbackArg();
+ }
+ if (result == Token.NOT_AVAILABLE) {
+ result = AbstractRegion.handleNotAvailable(result);
+ }
+ return result;
+ }
+ public boolean isCallbackArgumentAvailable() {
+ return this.getRawCallbackArgument() != Token.NOT_AVAILABLE;
+ }
+
+ /**
+ * Returns the value of the EntryEventImpl field.
+ * This is for internal use only. Customers should always call
+ * {@link #getCallbackArgument}
+ * @since 5.5
+ */
+ public Object getRawCallbackArgument() {
+ return this.keyInfo.getCallbackArg();
+ }
+
+ /**
+ * Sets the value of raw callback argument field.
+ */
+ public void setRawCallbackArgument(Object newCallbackArgument) {
+ this.keyInfo.setCallbackArg(newCallbackArgument);
+ }
+
+ public void setCallbackArgument(Object newCallbackArgument) {
+ if (this.keyInfo.getCallbackArg() instanceof WrappedCallbackArgument) {
+ ((WrappedCallbackArgument)this.keyInfo.getCallbackArg())
+ .setOriginalCallbackArgument(newCallbackArgument);
+ }
+ else {
+ this.keyInfo.setCallbackArg(newCallbackArgument);
+ }
+ }
+
+ /**
+ * @return null if new value is not serialized; otherwise returns a SerializedCacheValueImpl containing the new value.
+ */
+ public SerializedCacheValue<?> getSerializedNewValue() {
+ // In the case where there is a delta that has not been applied yet,
+ // do not apply it here since it would not produce a serialized new
+ // value (return null instead to indicate the new value is not
+ // in serialized form).
+ @Unretained(ENTRY_EVENT_NEW_VALUE)
+ final Object tmp = basicGetNewValue();
+ if (tmp instanceof CachedDeserializable) {
+ if (tmp instanceof StoredObject) {
+ if (!((StoredObject) tmp).isSerialized()) {
+ // TODO OFFHEAP can we handle offheap byte[] better?
+ return null;
+ }
+ }
+ byte[] bytes = this.newValueBytes;
+ if (bytes == null) {
+ bytes = this.cachedSerializedNewValue;
+ }
+ return new SerializedCacheValueImpl(this, getRegion(), this.re,
+ (CachedDeserializable)tmp, bytes);
+ } else {
+ // Note we return null even if cachedSerializedNewValue is not null.
+ // This is because some callers of this method use it to indicate
+ // that a CacheDeserializable should be created during deserialization.
+ return null;
+ }
+ }
+
+ /**
+ * Implement this interface if you want to call {@link #exportNewValue}.
+ *
+ * @author darrel
+ *
+ */
+ public interface NewValueImporter {
+ /**
+ * @return true if the importer prefers the value to be in serialized form.
+ */
+ boolean prefersNewSerialized();
+
+ /**
+ * Only return true if the importer can use the value before the event that exported it is released.
+ * If false is returned then off-heap values will be copied to the heap for the importer.
+ * @return true if the importer can deal with the value being an unretained OFF_HEAP_REFERENCE.
+ */
+ boolean isUnretainedNewReferenceOk();
+
+ /**
+ * Import a new value that is currently in object form.
+ * @param nv the new value to import; unretained if isUnretainedNewReferenceOk returns true
+ * @param isSerialized true if the imported new value represents data that needs to be serialized; false if the imported new value is a simple sequence of bytes.
+ */
+ void importNewObject(@Unretained(ENTRY_EVENT_NEW_VALUE) Object nv, boolean isSerialized);
+
+ /**
+ * Import a new value that is currently in byte array form.
+ * @param nv the new value to import
+ * @param isSerialized true if the imported new value represents data that needs to be serialized; false if the imported new value is a simple sequence of bytes.
+ */
+ void importNewBytes(byte[] nv, boolean isSerialized);
+ }
+
+ /**
+ * Export the event's new value to the given importer.
+ */
+ public final void exportNewValue(NewValueImporter importer) {
+ final boolean prefersSerialized = importer.prefersNewSerialized();
+ if (prefersSerialized) {
+ if (getCachedSerializedNewValue() != null) {
+ importer.importNewBytes(getCachedSerializedNewValue(), true);
+ return;
+ } else {
+ if (this.newValueBytes != null && this.newValue instanceof CachedDeserializable) {
+ importer.importNewBytes(this.newValueBytes, true);
+ return;
+ }
+ }
+ }
+ @Unretained(ENTRY_EVENT_NEW_VALUE)
+ final Object nv = getRawNewValue();
+ if (nv instanceof StoredObject) {
+ @Unretained(ENTRY_EVENT_NEW_VALUE)
+ final StoredObject so = (StoredObject) nv;
+ final boolean isSerialized = so.isSerialized();
+ if (nv instanceof Chunk) {
+ if (importer.isUnretainedNewReferenceOk()) {
+ importer.importNewObject(nv, isSerialized);
+ } else {
+ if (!isSerialized || prefersSerialized) {
+ byte[] bytes = so.getValueAsHeapByteArray();
+ importer.importNewBytes(bytes, isSerialized);
+ if (isSerialized) {
+ setCachedSerializedNewValue(bytes);
+ }
+ } else {
+ // TODO OFFHEAP: returns off-heap PdxInstance which is not ok since isUnretainedNewReferenceOk returned false
+ importer.importNewObject(so.getValueAsDeserializedHeapObject(), true);
+ }
+ }
+ } else {
+ importer.importNewObject(nv, isSerialized);
+ }
+ } else if (nv instanceof byte[]) {
+ importer.importNewBytes((byte[])nv, false);
+ } else if (nv instanceof CachedDeserializable) {
+ CachedDeserializable cd = (CachedDeserializable) nv;
+ Object cdV = cd.getValue();
+ if (cdV instanceof byte[]) {
+ importer.importNewBytes((byte[]) cdV, true);
+ setCachedSerializedNewValue((byte[]) cdV);
+ } else {
+ importer.importNewObject(cdV, true);
+ }
+ } else {
+ importer.importNewObject(nv, true);
+ }
+ }
+ /**
+ * Implement this interface if you want to call {@link #exportOldValue}.
+ *
+ * @author darrel
+ *
+ */
+ public interface OldValueImporter {
+ /**
+ * @return true if the importer prefers the value to be in serialized form.
+ */
+ boolean prefersOldSerialized();
+
+ /**
+ * Only return true if the importer can use the value before the event that exported it is released.
+ * @return true if the importer can deal with the value being an unretained OFF_HEAP_REFERENCE.
+ */
+ boolean isUnretainedOldReferenceOk();
+
+ /**
+ * @return return true if you want the old value to possibly be an instanceof CachedDeserializable; false if you want the value contained in a CachedDeserializable.
+ */
+ boolean isCachedDeserializableValueOk();
+
+ /**
+ * Import an old value that is currently in object form.
+ * @param ov the old value to import; unretained if isUnretainedOldReferenceOk returns true
+ * @param isSerialized true if the imported old value represents data that needs to be serialized; false if the imported old value is a simple sequence of bytes.
+ */
+ void importOldObject(@Unretained(ENTRY_EVENT_OLD_VALUE) Object ov, boolean isSerialized);
+
+ /**
+ * Import an old value that is currently in byte array form.
+ * @param ov the old value to import
+ * @param isSerialized true if the imported old value represents data that needs to be serialized; false if the imported old value is a simple sequence of bytes.
+ */
+ void importOldBytes(byte[] ov, boolean isSerialized);
+ }
+
+ /**
+ * Export the event's old value to the given importer.
+ */
+ public final void exportOldValue(OldValueImporter importer) {
+ final boolean prefersSerialized = importer.prefersOldSerialized();
+ if (prefersSerialized) {
+ if (this.oldValueBytes != null && this.oldValue instanceof CachedDeserializable) {
+ importer.importOldBytes(this.oldValueBytes, true);
+ return;
+ }
+ }
+ @Unretained(ENTRY_EVENT_OLD_VALUE)
+ final Object ov = getRawOldValue();
+ if (ov instanceof StoredObject) {
+ final StoredObject so = (StoredObject) ov;
+ final boolean isSerialized = so.isSerialized();
+ if (ov instanceof Chunk) {
+ if (importer.isUnretainedOldReferenceOk()) {
+ importer.importOldObject(ov, isSerialized);
+ } else {
+ if (!isSerialized || prefersSerialized) {
+ importer.importOldBytes(so.getValueAsHeapByteArray(), isSerialized);
+ } else {
+ // TODO OFFHEAP: returns off-heap PdxInstance which is not ok since isUnretainedNewReferenceOk returned false
+ importer.importOldObject(so.getValueAsDeserializedHeapObject(), true);
+ }
+ }
+ } else {
+ importer.importOldObject(ov, isSerialized);
+ }
+ } else if (ov instanceof byte[]) {
+ importer.importOldBytes((byte[])ov, false);
+ } else if (!importer.isCachedDeserializableValueOk() && ov instanceof CachedDeserializable) {
+ CachedDeserializable cd = (CachedDeserializable) ov;
+ Object cdV = cd.getValue();
+ if (cdV instanceof byte[]) {
+ importer.importOldBytes((byte[]) cdV, true);
+ } else {
+ importer.importOldObject(cdV, true);
+ }
+ } else {
+ importer.importOldObject(ov, true);
+ }
+ }
+
+ /**
+ * If applyDelta is true then first attempt to apply a delta (if we have one) and return the value.
+ * Else if new value is a Delta return it.
+ * Else if new value is off-heap return the StoredObject form (unretained OFF_HEAP_REFERENCE).
+ * Its refcount is not inced by this call and the returned object can only be safely used for the lifetime of the EntryEventImpl instance that returned the value.
+ * Else return the raw form.
+ */
+ @Unretained(ENTRY_EVENT_NEW_VALUE)
+ public final Object getRawNewValue(boolean applyDelta) {
+ if (applyDelta) {
+ boolean doCopyOnRead = getRegion().isCopyOnRead();
+ Object newValueWithDelta = applyDeltaWithCopyOnRead(doCopyOnRead);
+ if (newValueWithDelta != null) {
+ return newValueWithDelta;
+ }
+ // if applyDelta is true and we have already applied the delta then
+ // just return the applied value instead of the delta object.
+ @Unretained(ENTRY_EVENT_NEW_VALUE)
+ Object newValue = basicGetNewValue();
+ if (newValue != null) return newValue;
+ }
+ return getRawNewValue();
+ }
+ /**
+ * Just like getRawNewValue(true) except if the raw new value is off-heap deserialize it.
+ * Note that in some cases sqlf ignores the request to deserialize.
+ */
+ @Unretained(ENTRY_EVENT_NEW_VALUE)
+ public final Object getNewValueAsOffHeapDeserializedOrRaw() {
+ Object result = getRawNewValue(true);
+ if (result instanceof StoredObject) {
+ result = ((StoredObject) result).getDeserializedForReading();
+ }
+ return AbstractRegion.handleNotAvailable(result); // fixes 49499
+ }
+
+ /**
+ * If the new value is stored off-heap return a retained OFF_HEAP_REFERENCE (caller must release).
+ * @return a retained OFF_HEAP_REFERENCE if the new value is off-heap; otherwise returns null
+ */
+ @Retained(ENTRY_EVENT_NEW_VALUE)
+ public StoredObject getOffHeapNewValue() {
+ final Object tmp = basicGetNewValue();
+ if (tmp instanceof StoredObject) {
+ StoredObject result = (StoredObject) tmp;
+ if (!result.retain()) {
+ return null;
+ }
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * If the old value is stored off-heap return a retained OFF_HEAP_REFERENCE (caller must release).
+ * @return a retained OFF_HEAP_REFERENCE if the old value is off-heap; otherwise returns null
+ */
+ @Retained(ENTRY_EVENT_OLD_VALUE)
+ public StoredObject getOffHeapOldValue() {
+ final Object tmp = basicGetOldValue();
+ if (tmp instanceof StoredObject) {
+ StoredObject result = (StoredObject) tmp;
+ if (!result.retain()) {
+ return null;
+ }
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Result may be unretained because sqlf getDeserializedForReading returns unretained.
+ */
+ public final Object getDeserializedValue() {
+ if (this.delta == null) {
+ final Object val = basicGetNewValue();
+ if (val instanceof StoredObject) {
+ // TODO OFFHEAP: returns off-heap PdxInstance
+ return ((StoredObject) val).getValueAsDeserializedHeapObject();
+ } else
+ if (val instanceof CachedDeserializable) {
+ return ((CachedDeserializable)val).getDeserializedForReading();
+ }
+ else {
+ return val;
+ }
+ }
+ else {
+ return this.delta;
+ }
+ }
+
+ public final byte[] getSerializedValue() {
+ if (this.newValueBytes == null) {
+ final Object val;
+ if (this.delta == null) {
+ val = basicGetNewValue();
+ if (val instanceof byte[]) {
+ return (byte[])val;
+ }
+ else if (val instanceof CachedDeserializable) {
+ return ((CachedDeserializable)val).getSerializedValue();
+ }
+ }
+ else {
+ val = this.delta;
+ }
+ try {
+ return CacheServerHelper.serialize(val);
+ } catch (IOException ioe) {
+ throw new GemFireIOException("unexpected exception", ioe);
+ }
+ }
+ else {
+ return this.newValueBytes;
+ }
+ }
+
+ /**
+ * Forces this entry's new value to be in serialized form.
+ * @since 5.0.2
+ */
+ public void makeSerializedNewValue() {
+ makeSerializedNewValue(false);
+ }
+
+ /**
+ * @param isSynced true if RegionEntry currently under synchronization
+ */
+ private final void makeSerializedNewValue(boolean isSynced) {
+ Object obj = basicGetNewValue();
+
+ // ezoerner:20080611 In the case where there is an unapplied
+ // delta, do not apply the delta or serialize yet unless entry is
+ // under synchronization (isSynced is true)
+ if (isSynced) {
+ this.setSerializationDeferred(false);
+ }
+ else if (obj == null && this.delta != null) {
+ // defer serialization until setNewValueInRegion
+ this.setSerializationDeferred(true);
+ return;
+ }
+ basicSetNewValue(getCachedDeserializable(obj, this));
+ }
+
+ public static Object getCachedDeserializable(Object obj) {
+ return getCachedDeserializable(obj, null);
+ }
+
+ public static Object getCachedDeserializable(Object obj, EntryEventImpl ev) {
+ if (obj instanceof byte[]
+ || obj == null
+ || obj instanceof CachedDeserializable
+ || obj == Token.NOT_AVAILABLE
+ || Token.isInvalidOrRemoved(obj)
+ // don't serialize delta object already serialized
+ || obj instanceof com.gemstone.gemfire.Delta
+ || obj instanceof Delta) { // internal delta
+ return obj;
+ }
+ final CachedDeserializable cd;
+ // avoid unneeded serialization of byte[][] used by SQLFabric that
+ // will end up being deserialized in any case (serialization is cheap
+ // for byte[][] anyways)
+ if (obj instanceof byte[][]) {
+ int objSize = Sizeable.PER_OBJECT_OVERHEAD + 4;
+ for (byte[] bytes : (byte[][])obj) {
+ if (bytes != null) {
+ objSize += CachedDeserializableFactory.getByteSize(bytes);
+ }
+ else {
+ objSize += Sizeable.PER_OBJECT_OVERHEAD;
+ }
+ }
+ cd = CachedDeserializableFactory.create(obj, objSize);
+ }
+ else {
+ final byte[] b = serialize(obj);
+ cd = CachedDeserializableFactory.create(b);
+ if (ev != null) {
+ ev.newValueBytes = b;
+ ev.cachedSerializedNewValue = b;
+ }
+ }
+ return cd;
+ }
+ public void setCachedSerializedNewValue(byte[] v) {
+ this.cachedSerializedNewValue = v;
+ }
+ public byte[] getCachedSerializedNewValue() {
+ return this.cachedSerializedNewValue;
+ }
+
+ public final void setSerializedNewValue(byte[] serializedValue) {
+ Object newVal = null;
+ if (serializedValue != null) {
+ if (CachedDeserializableFactory.preferObject()) {
+ newVal = deserialize(serializedValue);
+ } else {
+ newVal = CachedDeserializableFactory.create(serializedValue);
+ }
+ if (newVal instanceof Delta) {
+ this.delta = (Delta)newVal;
+ newVal = null;
+ // We need the newValueBytes field and the newValue field to be in sync.
+ // In the case of non-null delta set both fields to null.
+ serializedValue = null;
+ }
+ }
+ this.newValueBytes = serializedValue;
+ basicSetNewValue(newVal);
+ this.cachedSerializedNewValue = serializedValue;
+ }
+
+ public void setSerializedOldValue(byte[] serializedOldValue){
+ this.oldValueBytes = serializedOldValue;
+ final Object ov;
+ if (CachedDeserializableFactory.preferObject()) {
+ ov = deserialize(serializedOldValue);
+ }
+ else if (serializedOldValue != null) {
+ ov = CachedDeserializableFactory.create(serializedOldValue);
+ }
+ else {
+ ov = null;
+ }
+ retainAndSetOldValue(ov);
+ }
+
+ /**
+ * If true (the default) then preserve old values in events.
+ * If false then mark non-null values as being NOT_AVAILABLE.
+ */
+ private static final boolean EVENT_OLD_VALUE = !Boolean.getBoolean("gemfire.disable-event-old-value");
+
+
+ void putExistingEntry(final LocalRegion owner, RegionEntry entry) throws RegionClearedException {
+ putExistingEntry(owner, entry, false, null);
+ }
+
+ /**
+ * Put a newValue into the given, write synced, existing, region entry.
+ * Sets oldValue in event if hasn't been set yet.
+ * @param oldValueForDelta Used by Delta Propagation feature
+ *
+ * @throws RegionClearedException
+ */
+ void putExistingEntry(final LocalRegion owner, final RegionEntry reentry,
+ boolean requireOldValue, Object oldValueForDelta) throws RegionClearedException {
+ makeUpdate();
+ // only set oldValue if it hasn't already been set to something
+ if (this.oldValue == null) {
+ if (!reentry.isInvalidOrRemoved()) {
+ if (requireOldValue ||
+ EVENT_OLD_VALUE
+ || this.region instanceof HARegion // fix for bug 37909
+ || GemFireCacheImpl.sqlfSystem()
+ ) {
+ @Retained Object ov;
+ if (ReferenceCountHelper.trackReferenceCounts()) {
+ ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
+ if (GemFireCacheImpl.sqlfSystem()) {
+ ov = reentry.getValueOffHeapOrDiskWithoutFaultIn(this.region);
+ } else {
+ ov = reentry._getValueRetain(owner, true);
+ }
+ ReferenceCountHelper.setReferenceCountOwner(null);
+ } else {
+ if (GemFireCacheImpl.sqlfSystem()) {
+ ov = reentry.getValueOffHeapOrDiskWithoutFaultIn(this.region);
+ } else {
+ ov = reentry._getValueRetain(owner, true);
+ }
+ }
+ if (ov == null) ov = Token.NOT_AVAILABLE;
+ // ov has already been retained so call basicSetOldValue instead of retainAndSetOldValue
+ basicSetOldValue(ov);
+ } else {
+ basicSetOldValue(Token.NOT_AVAILABLE);
+ }
+ }
+ }
+ if (this.oldValue == Token.NOT_AVAILABLE) {
+ FilterProfile fp = this.region.getFilterProfile();
+ if (this.op.guaranteesOldValue() ||
+ (fp != null /* #41532 */&& fp.entryRequiresOldValue(this.getKey()))) {
+ setOldValueForQueryProcessing();
+ }
+ }
+
+ //setNewValueInRegion(null);
+ setNewValueInRegion(owner, reentry, oldValueForDelta);
+ }
+
+ /**
+ * If we are currently a create op then turn us into an update
+ *
+ * @since 5.0
+ */
+ void makeUpdate()
+ {
+ setOperation(this.op.getCorrespondingUpdateOp());
+ }
+
+ /**
+ * If we are currently an update op then turn us into a create
+ *
+ * @since 5.0
+ */
+ void makeCreate()
+ {
+ setOperation(this.op.getCorrespondingCreateOp());
+ }
+
+ /**
+ * Put a newValue into the given, write synced, new, region entry.
+ * @throws RegionClearedException
+ */
+ void putNewEntry(final LocalRegion owner, final RegionEntry reentry)
+ throws RegionClearedException {
+ if (!this.op.guaranteesOldValue()) { // preserves oldValue for CM ops in clients
+ basicSetOldValue(null);
+ }
+ makeCreate();
+ setNewValueInRegion(owner, reentry, null);
+ }
+
+ void setRegionEntry(RegionEntry re) {
+ this.re = re;
+ }
+
+ RegionEntry getRegionEntry() {
+ return this.re;
+ }
+
+ @Retained(ENTRY_EVENT_NEW_VALUE)
+ private void setNewValueInRegion(final LocalRegion owner,
+ final RegionEntry reentry, Object oldValueForDelta) throws RegionClearedException {
+
+ boolean wasTombstone = reentry.isTombstone();
+
+ // put in newValue
+
+ if (applyDelta(this.op.isCreate())) {
+ if (this.isSerializationDeferred()) {
+ makeSerializedNewValue(true);
+ }
+ }
+
+ // If event contains new value, then it may mean that the delta bytes should
+ // not be applied. This is possible if the event originated locally.
+ if (this.deltaBytes != null && this.newValue == null) {
+ processDeltaBytes(oldValueForDelta);
+ }
+
+ if (owner!=null) {
+ owner.generateAndSetVersionTag(this, reentry);
+ } else {
+ this.region.generateAndSetVersionTag(this, reentry);
+ }
+
+ Object v = this.newValue;
+ if (v == null) {
+ v = isLocalInvalid() ? Token.LOCAL_INVALID : Token.INVALID;
+ }
+ else {
+ this.region.regionInvalid = false;
+ }
+
+ reentry.setValueResultOfSearch(this.op.isNetSearch());
+
+ //dsmith:20090524
+ //This is a horrible hack, but we need to get the size of the object
+ //When we store an entry. This code is only used when we do a put
+ //in the primary.
+ if(v instanceof com.gemstone.gemfire.Delta && region.isUsedForPartitionedRegionBucket()) {
+ int vSize;
+ Object ov = basicGetOldValue();
+ if(ov instanceof CachedDeserializable && !GemFireCacheImpl.DELTAS_RECALCULATE_SIZE) {
+ vSize = ((CachedDeserializable) ov).getValueSizeInBytes();
+ } else {
+ vSize = CachedDeserializableFactory.calcMemSize(v, region.getObjectSizer(), false);
+ }
+ v = CachedDeserializableFactory.create(v, vSize);
+ basicSetNewValue(v);
+ }
+
+ Object preparedV = reentry.prepareValueForCache(this.region, v, this, this.hasDelta());
+ if (preparedV != v) {
+ v = preparedV;
+ if (v instanceof Chunk) {
+ if (!((Chunk) v).isCompressed()) { // fix bug 52109
+ // If we put it off heap and it is not compressed then remember that value.
+ // Otherwise we want to remember the decompressed value in the event.
+ basicSetNewValue(v);
+ }
+ }
+ }
+ boolean isTombstone = (v == Token.TOMBSTONE);
+ boolean success = false;
+ boolean calledSetValue = false;
+ try {
+ setNewValueBucketSize(owner, v);
+
+ // ezoerner:20081030
+ // last possible moment to do index maintenance with old value in
+ // RegionEntry before new value is set.
+ // As part of an update, this is a remove operation as prelude to an add that
+ // will come after the new value is set.
+ // If this is an "update" from INVALID state, treat this as a create instead
+ // for the purpose of index maintenance since invalid entries are not
+ // indexed.
+
+ if ((this.op.isUpdate() && !reentry.isInvalid()) || this.op.isInvalidate()) {
+ IndexManager idxManager = IndexUtils.getIndexManager(this.region, false);
+ if (idxManager != null) {
+ try {
+ idxManager.updateIndexes(reentry,
+ IndexManager.REMOVE_ENTRY,
+ this.op.isUpdate() ?
+ IndexProtocol.BEFORE_UPDATE_OP :
+ IndexProtocol.OTHER_OP);
+ }
+ catch (QueryException e) {
+ throw new IndexMaintenanceException(e);
+ }
+ }
+ }
+ final IndexUpdater indexUpdater = this.region.getIndexUpdater();
+ if (indexUpdater != null) {
+ final LocalRegion indexRegion;
+ if (owner != null) {
+ indexRegion = owner;
+ }
+ else {
+ indexRegion = this.region;
+ }
+ try {
+ indexUpdater.onEvent(indexRegion, this, reentry);
+ calledSetValue = true;
+ reentry.setValueWithTombstoneCheck(v, this); // already called prepareValueForCache
+ success = true;
+ } finally {
+ indexUpdater.postEvent(indexRegion, this, reentry, success);
+ }
+ }
+ else {
+ calledSetValue = true;
+ reentry.setValueWithTombstoneCheck(v, this); // already called prepareValueForCache
+ success = true;
+ }
+ } finally {
+ if (!success && reentry instanceof OffHeapRegionEntry && v instanceof Chunk) {
+ OffHeapRegionEntryHelper.releaseEntry((OffHeapRegionEntry)reentry, (Chunk)v);
+ }
+ }
+ if (logger.isTraceEnabled()) {
+ if (v instanceof CachedDeserializable) {
+ logger.trace("EntryEventImpl.setNewValueInRegion: put CachedDeserializable({},{})",
+ this.getKey(), ((CachedDeserializable)v).getStringForm());
+ }
+ else {
+ logger.trace("EntryEventImpl.setNewValueInRegion: put({},{})",
+ this.getKey(), StringUtils.forceToString(v));
+ }
+ }
+
+ if (!isTombstone && wasTombstone) {
+ owner.unscheduleTombstone(reentry);
+ }
+ }
+
+ /**
+ * The size the new value contributes to a pr bucket.
+ * Note if this event is not on a pr then this value will be 0.
+ */
+ private transient int newValueBucketSize;
+ public int getNewValueBucketSize() {
+ return this.newValueBucketSize;
+ }
+ private void setNewValueBucketSize(LocalRegion lr, Object v) {
+ if (lr == null) {
+ lr = this.region;
+ }
+ this.newValueBucketSize = lr.calculateValueSize(v);
+ }
+
+ private void processDeltaBytes(Object oldValueInVM) {
+ if (!this.region.hasSeenEvent(this)) {
+ if (oldValueInVM == null || Token.isInvalidOrRemoved(oldValueInVM)) {
+ this.region.getCachePerfStats().incDeltaFailedUpdates();
+ throw new InvalidDeltaException("Old value not found for key "
+ + this.keyInfo.getKey());
+ }
+ FilterProfile fp = this.region.getFilterProfile();
+ // If compression is enabled then we've already gotten a new copy due to the
+ // serializaion and deserialization that occurs.
+ boolean copy = this.region.getCompressor() == null &&
+ (this.region.isCopyOnRead()
+ || this.region.getCloningEnabled()
+ || (fp != null && fp.getCqCount() > 0));
+ Object value = oldValueInVM;
+ boolean wasCD = false;
+ if (value instanceof CachedDeserializable) {
+ wasCD = true;
+ if (copy) {
+ value = ((CachedDeserializable)value).getDeserializedWritableCopy(this.region, re);
+ } else {
+ value = ((CachedDeserializable)value).getDeserializedValue(
+ this.region, re);
+ }
+ } else {
+ if (copy) {
+ value = CopyHelper.copy(value);
+ }
+ }
+ boolean deltaBytesApplied = false;
+ try {
+ long start = CachePerfStats.getStatTime();
+ ((com.gemstone.gemfire.Delta)value).fromDelta(new DataInputStream(
+ new ByteArrayInputStream(getDeltaBytes())));
+ this.region.getCachePerfStats().endDeltaUpdate(start);
+ deltaBytesApplied = true;
+ } catch (RuntimeException rte) {
+ throw rte;
+ } catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ } catch (Throwable t) {
+ SystemFailure.checkFailure();
+ throw new DeltaSerializationException(
+ "Exception while deserializing delta bytes.", t);
+ } finally {
+ if (!deltaBytesApplied) {
+ this.region.getCachePerfStats().incDeltaFailedUpdates();
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Delta has been applied for key {}", getKey());
+ }
+ // assert event.getNewValue() == null;
+ if (wasCD) {
+ CachedDeserializable old = (CachedDeserializable)oldValueInVM;
+ int valueSize;
+ if (GemFireCacheImpl.DELTAS_RECALCULATE_SIZE) {
+ valueSize = CachedDeserializableFactory.calcMemSize(value, region
+ .getObjectSizer(), false);
+ } else {
+ valueSize = old.getValueSizeInBytes();
+ }
+ value = CachedDeserializableFactory.create(value, valueSize);
+ }
+ setNewValue(value);
+ if (this.causedByMessage != null
+ && this.causedByMessage instanceof PutMessage) {
+ ((PutMessage)this.causedByMessage).setDeltaValObj(value);
+ }
+ } else {
+ this.region.getCachePerfStats().incDeltaFailedUpdates();
+ throw new InvalidDeltaException(
+ "Cache encountered replay of event containing delta bytes for key "
+ + this.keyInfo.getKey());
+ }
+ }
+
+ void setTXEntryOldValue(Object oldVal, boolean mustBeAvailable) {
+ if (Token.isInvalidOrRemoved(oldVal)) {
+ oldVal = null;
+ }
+ else {
+ if (mustBeAvailable || oldVal == null || EVENT_OLD_VALUE) {
+ // set oldValue to oldVal
+ }
+ else {
+ oldVal = Token.NOT_AVAILABLE;
+ }
+ }
+ retainAndSetOldValue(oldVal);
+ }
+
+ void putValueTXEntry(final TXEntryState tx) {
+ Object v = basicGetNewValue();
+ if (v == null) {
+ if (deltaBytes != null) {
+ // since newValue is null, and we have deltaBytes
+ // there must be a nearSidePendingValue
+ processDeltaBytes(tx.getNearSidePendingValue());
+ v = basicGetNewValue();
+ } else if (this.delta != null) {
+ v = this.delta;
+ } else {
+ v = isLocalInvalid() ? Token.LOCAL_INVALID : Token.INVALID;
+ }
+ }
+
+ if (this.op != Operation.LOCAL_INVALIDATE
+ && this.op != Operation.LOCAL_DESTROY) {
+ // fix for bug 34387
+ tx.setPendingValue(OffHeapHelper.copyIfNeeded(v)); // TODO OFFHEAP optimize
+ }
+ tx.setCallbackArgument(getCallbackArgument());
+ }
+
+ /** @return false if entry doesn't exist */
+ public boolean setOldValueFromRegion()
+ {
+ try {
+ RegionEntry re = this.region.getRegionEntry(getKey());
+ if (re == null) return false;
+ ReferenceCountHelper.skipRefCountTracking();
+ Object v = re._getValueRetain(this.region, true);
+ ReferenceCountHelper.unskipRefCountTracking();
+ try {
+ return setOldValue(v);
+ } finally {
+ OffHeapHelper.releaseWithNoTracking(v);
+ }
+ }
+ catch (EntryNotFoundException ex) {
+ return false;
+ }
+ }
+
+ /** Return true if old value is the DESTROYED token */
+ boolean oldValueIsDestroyedToken()
+ {
+ return this.oldValue == Token.DESTROYED || this.oldValue == Token.TOMBSTONE;
+ }
+
+ void setOldValueDestroyedToken()
+ {
+ basicSetOldValue(Token.DESTROYED);
+ }
+
+ /**
+ * @return false if value 'v' indicates that entry does not exist
+ */
+ public boolean setOldValue(Object v) {
+ return setOldValue(v, false);
+ }
+
+
+ /**
+ * @param force true if the old value should be forcibly set, used
+ * for HARegions, methods like putIfAbsent, etc.,
+ * where the old value must be available.
+ * @return false if value 'v' indicates that entry does not exist
+ */
+ public boolean setOldValue(Object v, boolean force) {
+ if (v == null || Token.isRemoved(v)) {
+ return false;
+ }
+ else {
+ if (Token.isInvalid(v)) {
+ v = null;
+ }
+ else {
+ if (force ||
+ (this.region instanceof HARegion) // fix for bug 37909
+ ) {
+ // set oldValue to "v".
+ } else if (EVENT_OLD_VALUE) {
+ // TODO Rusty add compression support here
+ // set oldValue to "v".
+ } else {
+ v = Token.NOT_AVAILABLE;
+ }
+ }
+ retainAndSetOldValue(v);
+ return true;
+ }
+ }
+
+ /**
+ * sets the old value for concurrent map operation results received
+ * from a server.
+ */
+ public void setConcurrentMapOldValue(Object v) {
+ if (Token.isRemoved(v)) {
+ return;
+ } else {
+ if (Token.isInvalid(v)) {
+ v = null;
+ }
+ retainAndSetOldValue(v);
+ }
+ }
+
+ /** Return true if new value available */
+ public boolean hasNewValue() {
+ Object tmp = this.newValue;
+ if (tmp == null && hasDelta()) {
+ // ???:ezoerner:20080611 what if applying the delta would produce
+ // null or (strangely) NOT_AVAILABLE.. do we need to apply it here to
+ // find out?
+ return true;
+ }
+ return tmp != null && tmp != Token.NOT_AVAILABLE;
+ }
+
+ public final boolean hasOldValue() {
+ return this.oldValue != null && this.oldValue != Token.NOT_AVAILABLE;
+ }
+ public final boolean isOldValueAToken() {
+ return this.oldValue instanceof Token;
+ }
+
+ /**
+ * This should only be used in case of internal delta and <B>not for Delta of
+ * Delta Propagation feature</B>.
+ *
+ * @return boolean
+ */
+ public boolean hasDelta() {
+ return (this.delta != null);
+ }
+
+ public boolean isOldValueAvailable() {
+ if (isOriginRemote() && this.region.isProxy()) {
+ return false;
+ } else {
+ return basicGetOldValue() != Token.NOT_AVAILABLE;
+ }
+ }
+
+ public void oldValueNotAvailable() {
+ basicSetOldValue(Token.NOT_AVAILABLE);
+ }
+
+ public static Object deserialize(byte[] bytes) {
+ return deserialize(bytes, null, null);
+ }
+
+ public static Object deserialize(byte[] bytes, Version version,
+ ByteArrayDataInput in) {
+ if (bytes == null)
+ return null;
+ try {
+ return BlobHelper.deserializeBlob(bytes, version, in);
+ }
+ catch (IOException e) {
+ throw new SerializationException(LocalizedStrings.EntryEventImpl_AN_IOEXCEPTION_WAS_THROWN_WHILE_DESERIALIZING.toLocalizedString(), e);
+ }
+ catch (ClassNotFoundException e) {
+ // fix for bug 43602
+ throw new SerializationException(LocalizedStrings.EntryEventImpl_A_CLASSNOTFOUNDEXCEPTION_WAS_THROWN_WHILE_TRYING_TO_DESERIALIZE_CACHED_VALUE.toLocalizedString(), e);
+ }
+ }
+
+ /**
+ * If a PdxInstance is returned then it will have an unretained reference
+ * to Chunk's off-heap address.
+ */
+ public static @Unretained Object deserializeChunk(Chunk bytes) {
+ if (bytes == null)
+ return null;
+ try {
+ return BlobHelper.deserializeOffHeapBlob(bytes);
+ }
+ catch (IOException e) {
+ throw new SerializationException(LocalizedStrings.EntryEventImpl_AN_IOEXCEPTION_WAS_THROWN_WHILE_DESERIALIZING.toLocalizedString(), e);
+ }
+ catch (ClassNotFoundException e) {
+ // fix for bug 43602
+ throw new SerializationException(LocalizedStrings.EntryEventImpl_A_CLASSNOTFOUNDEXCEPTION_WAS_THROWN_WHILE_TRYING_TO_DESERIALIZE_CACHED_VALUE.toLocalizedString(), e);
+ }
+ }
+
+ /**
+ * Serialize an object into a <code>byte[]</code>
+ *
+ * @throws IllegalArgumentException
+ * If <code>obj</code> should not be serialized
+ */
+ public static byte[] serialize(Object obj) {
+ return serialize(obj, null);
+ }
+
+ /**
+ * Serialize an object into a <code>byte[]</code>
+ *
+ * @throws IllegalArgumentException
+ * If <code>obj</code> should not be serialized
+ */
+ public static byte[] serialize(Object obj, Version version)
+ {
+ if (obj == null || obj == Token.NOT_AVAILABLE
+ || Token.isInvalidOrRemoved(obj))
+ throw new IllegalArgumentException(LocalizedStrings.EntryEventImpl_MUST_NOT_SERIALIZE_0_IN_THIS_CONTEXT.toLocalizedString(obj));
+ try {
+ return BlobHelper.serializeToBlob(obj, version);
+ }
+ catch (IOException e) {
+ throw new SerializationException(LocalizedStrings.EntryEventImpl_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString(), e);
+ }
+ }
+
+
+ /**
+ * Serialize an object into a <code>byte[]</code> . If the byte array
+ * provided by the wrapper is sufficient to hold the data, it is used
+ * otherwise a new byte array gets created & its reference is stored in the
+ * wrapper. The User Bit is also appropriately set as Serialized
+ *
+ * @param wrapper
+ * Object of type BytesAndBitsForCompactor which is used to fetch
+ * the serialized data. The byte array of the wrapper is used
+ * if possible else a the new byte array containing the data is
+ * set in the wrapper.
+ * @throws IllegalArgumentException
+ * If <code>obj</code> should not be serialized
+ */
+ public static void fillSerializedValue(BytesAndBitsForCompactor wrapper,
+ Object obj, byte userBits) {
+ if (obj == null || obj == Token.NOT_AVAILABLE
+ || Token.isInvalidOrRemoved(obj))
+ throw new IllegalArgumentException(
+ LocalizedStrings.EntryEvents_MUST_NOT_SERIALIZE_0_IN_THIS_CONTEXT.toLocalizedString(obj));
+ try {
+ HeapDataOutputStream hdos = null;
+ if (wrapper.getBytes().length < 32) {
+ hdos = new HeapDataOutputStream(Version.CURRENT);
+ }
+ else {
+ hdos = new HeapDataOutputStream(wrapper.getBytes());
+ }
+ DataSerializer.writeObject(obj, hdos);
+ // return hdos.toByteArray();
+ hdos.sendTo(wrapper, userBits);
+ }
+ catch (IOException e) {
+ RuntimeException e2 = new IllegalArgumentException(
+ LocalizedStrings.EntryEventImpl_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString());
+ e2.initCause(e);
+ throw e2;
+ }
+ }
+
+ protected String getShortClassName() {
+ String cname = getClass().getName();
+ return cname.substring(getClass().getPackage().getName().length()+1);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(getShortClassName());
+ buf.append("[");
+
+ buf.append("op=");
+ buf.append(getOperation());
+ buf.append(";key=");
+ buf.append(this.getKey());
+ buf.append(";oldValue=");
+ try {
+ ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
+ } catch (IllegalStateException ex) {
+ buf.append("OFFHEAP_VALUE_FREED");
+ }
+ buf.append(";newValue=");
+ try {
+ ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
+ } catch (IllegalStateException ex) {
+ buf.append("OFFHEAP_VALUE_FREED");
+ }
+ buf.append(";callbackArg=");
+ buf.append(this.getRawCallbackArgument());
+ buf.append(";originRemote=");
+ buf.append(isOriginRemote());
+ buf.append(";originMember=");
+ buf.append(getDistributedMember());
+// if (this.partitionMessage != null) {
+// buf.append("; partitionMessage=");
+// buf.append(this.partitionMessage);
+// }
+ if (this.isPossibleDuplicate()) {
+ buf.append(";posDup");
+ }
+ if (callbacksInvoked()) {
+ buf.append(";callbacksInvoked");
+ }
+ if (this.versionTag != null) {
+ buf.append(";version=").append(this.versionTag);
+ }
+ if (getContext() != null) {
+ buf.append(";context=");
+ buf.append(getContext());
+ }
+ if (this.eventID != null) {
+ buf.append(";id=");
+ buf.append(this.eventID);
+ }
+ if (this.deltaBytes != null) {
+ buf.append(";[" + this.deltaBytes.length + " deltaBytes]");
+ }
+// else {
+// buf.append(";[no deltaBytes]");
+// }
+ if (this.filterInfo != null) {
+ buf.append(";routing=");
+ buf.append(this.filterInfo);
+ }
+ if (this.isFromServer()) {
+ buf.append(";isFromServer");
+ }
+ if (this.isConcurrencyConflict()) {
+ buf.append(";isInConflict");
+ }
+ if (this.getInhibitDistribution()) {
+ buf.append(";inhibitDistribution");
+ }
+ buf.append("]");
+ return buf.toString();
+ }
+
+ public int getDSFID() {
+ return ENTRY_EVENT;
+ }
+
+ public void toData(DataOutput out) throws IOException
+ {
+ DataSerializer.writeObject(this.eventID, out);
+ DataSerializer.writeObject(this.getKey(), out);
+ DataSerializer.writeObject(this.keyInfo.getValue(), out);
+ out.writeByte(this.op.ordinal);
+ out.writeShort(this.eventFlags & EventFlags.FLAG_TRANSIENT_MASK);
+ DataSerializer.writeObject(this.getRawCallbackArgument(), out);
+ DataSerializer.writeObject(this.txId, out);
+
+ {
+ boolean isDelta = this.delta != null;
+ out.writeBoolean(isDelta);
+ if (isDelta) {
+ DataSerializer.writeObject(this.delta, out);
+ }
+ else {
+ Object nv = basicGetNewValue();
+ boolean newValueSerialized = nv instanceof CachedDeserializable;
+ if (newValueSerialized) {
+ if (nv instanceof StoredObject) {
+ newValueSerialized = ((StoredObject) nv).isSerialized();
+ }
+ }
+ out.writeBoolean(newValueSerialized);
+ if (newValueSerialized) {
+ if (this.newValueBytes != null) {
+ DataSerializer.writeByteArray(this.newValueBytes, out);
+ } else if (this.cachedSerializedNewValue != null) {
+ DataSerializer.writeByteArray(this.cachedSerializedNewValue, out);
+ } else {
+ CachedDeserializable cd = (CachedDeserializable)nv;
+ DataSerializer.writeObjectAsByteArray(cd.getValue(), out);
+ }
+ }
+ else {
+ DataSerializer.writeObject(nv, out);
+ }
+ }
+ }
+
+ {
+ Object ov = basicGetOldValue();
+ boolean oldValueSerialized = ov instanceof CachedDeserializable;
+ if (oldValueSerialized) {
+ if (ov instanceof StoredObject) {
+ oldValueSerialized = ((StoredObject) ov).isSerialized();
+ }
+ }
+ out.writeBoolean(oldValueSerialized);
+ if (oldValueSerialized) {
+ if (this.oldValueBytes != null) {
+ DataSerializer.writeByteArray(this.oldValueBytes, out);
+ }
+ else {
+ CachedDeserializable cd = (CachedDeserializable)ov;
+ DataSerializer.writeObjectAsByteArray(cd.getValue(), out);
+ }
+ }
+ else {
+ DataSerializer.writeObject(ov, out);
+ }
+ }
+ InternalDataSerializer.invokeToData((InternalDistributedMember)this.distributedMember, out);
+ DataSerializer.writeObject(getContext(), out);
+ DataSerializer.writeLong(tailKey, out);
+ }
+
+ private static abstract class EventFlags
+ {
+ private static final short FLAG_ORIGIN_REMOTE = 0x01;
+ // localInvalid: true if a null new value should be treated as a local
+
<TRUNCATED>