You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/23 09:23:02 UTC
[26/38] incubator-ignite git commit: # ignite-41
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f537940c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
index 0000000,1cf0c33..0cec41d
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
@@@ -1,0 -1,1060 +1,1106 @@@
+ /* @java.file.header */
+
+ /* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+ package org.gridgain.grid.kernal.processors.cache.transactions;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.optimized.*;
+ import org.gridgain.grid.cache.*;
+ import org.gridgain.grid.kernal.processors.cache.*;
++import org.gridgain.grid.kernal.processors.cache.distributed.*;
+ import org.gridgain.grid.util.lang.*;
+ import org.gridgain.grid.util.tostring.*;
+ import org.gridgain.grid.util.typedef.*;
+ import org.gridgain.grid.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
++import javax.cache.expiry.*;
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.atomic.*;
+
+ import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
+
+ /**
+ * Transaction entry. Note that it is essential that this class does not override
+ * {@link #equals(Object)} method, as transaction entries should use referential
+ * equality.
+ */
+ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, IgniteOptimizedMarshallable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"})
+ private static Object GG_CLASS_ID;
+
+ /** Owning transaction. */
+ @GridToStringExclude
+ private IgniteTxEx<K, V> tx;
+
+ /** Cache key. */
+ @GridToStringInclude
+ private K key;
+
+ /** Key bytes. */
+ private byte[] keyBytes;
+
+ /** Cache ID. */
+ private int cacheId;
+
+ /** Transient tx key. */
+ private IgniteTxKey<K> txKey;
+
+ /** Cache value. */
+ @GridToStringInclude
+ private TxEntryValueHolder<K, V> val = new TxEntryValueHolder<>();
+
+ /** Visible value for peek. */
+ @GridToStringInclude
+ private TxEntryValueHolder<K, V> prevVal = new TxEntryValueHolder<>();
+
+ /** Filter bytes. */
+ private byte[] filterBytes;
+
+ /** Transform. */
+ @GridToStringInclude
+ private Collection<IgniteClosure<V, V>> transformClosCol;
+
+ /** Transform closure bytes. */
+ @GridToStringExclude
+ private byte[] transformClosBytes;
+
+ /** Time to live. */
+ private long ttl;
+
+ /** DR expire time (explicit) */
+ private long drExpireTime = -1L;
+
+ /** Explicit lock version if there is one. */
+ @GridToStringInclude
+ private GridCacheVersion explicitVer;
+
+ /** DHT version. */
+ private transient volatile GridCacheVersion dhtVer;
+
+ /** Put filters. */
+ @GridToStringInclude
+ private IgnitePredicate<GridCacheEntry<K, V>>[] filters;
+
+ /** Flag indicating whether filters passed. Used for fast-commit transactions. */
+ private boolean filtersPassed;
+
+ /** Flag indicating that filter is set and can not be replaced. */
+ private transient boolean filtersSet;
+
+ /** Underlying cache entry. */
+ private transient volatile GridCacheEntryEx<K, V> entry;
+
+ /** Cache registry. */
+ private transient GridCacheContext<K, V> ctx;
+
+ /** Prepared flag to prevent multiple candidate add. */
+ @SuppressWarnings({"TransientFieldNotInitialized"})
+ private transient AtomicBoolean prepared = new AtomicBoolean();
+
+ /** Lock flag for colocated cache. */
+ private transient boolean locked;
+
+ /** Assigned node ID (required only for partitioned cache). */
+ private transient UUID nodeId;
+
+ /** Flag if this node is a back up node. */
+ private boolean locMapped;
+
+ /** Group lock entry flag. */
+ private boolean grpLock;
+
+ /** Flag indicating if this entry should be transferred to remote node. */
+ private boolean transferRequired;
+
+ /** Deployment enabled flag. */
+ private boolean depEnabled;
+
+ /** Data center replication version. */
+ private GridCacheVersion drVer;
+
++ /** Expiry policy. */
++ private ExpiryPolicy expiryPlc;
++
++ /** */
++ private boolean transferExpiryPlc;
++
+ /**
+ * Required by {@link Externalizable}
+ */
+ public IgniteTxEntry() {
+ /* No-op. */
+ }
+
+ /**
+ * This constructor is meant for remote transactions.
+ *
+ * @param ctx Cache registry.
+ * @param tx Owning transaction.
+ * @param op Operation.
+ * @param val Value.
+ * @param ttl Time to live.
+ * @param drExpireTime DR expire time.
+ * @param entry Cache entry.
+ * @param drVer Data center replication version.
+ */
- public IgniteTxEntry(GridCacheContext<K, V> ctx, IgniteTxEx<K, V> tx, GridCacheOperation op, V val,
- long ttl, long drExpireTime, GridCacheEntryEx<K, V> entry, @Nullable GridCacheVersion drVer) {
++ public IgniteTxEntry(GridCacheContext<K, V> ctx,
++ IgniteTxEx<K, V> tx,
++ GridCacheOperation op,
++ V val,
++ long ttl,
++ long drExpireTime,
++ GridCacheEntryEx<K, V> entry,
++ @Nullable GridCacheVersion drVer) {
+ assert ctx != null;
+ assert tx != null;
+ assert op != null;
+ assert entry != null;
+
+ this.ctx = ctx;
+ this.tx = tx;
+ this.val.value(op, val, false, false);
+ this.entry = entry;
+ this.ttl = ttl;
+ this.drExpireTime = drExpireTime;
+ this.drVer = drVer;
+
+ key = entry.key();
+ keyBytes = entry.keyBytes();
+
+ cacheId = entry.context().cacheId();
+
+ depEnabled = ctx.gridDeploy().enabled();
+ }
+
+ /**
+ * This constructor is meant for local transactions.
+ *
+ * @param ctx Cache registry.
+ * @param tx Owning transaction.
+ * @param op Operation.
+ * @param val Value.
+ * @param transformClos Transform closure.
+ * @param ttl Time to live.
+ * @param entry Cache entry.
+ * @param filters Put filters.
+ * @param drVer Data center replication version.
+ */
- public IgniteTxEntry(GridCacheContext<K, V> ctx, IgniteTxEx<K, V> tx, GridCacheOperation op,
- V val, IgniteClosure<V, V> transformClos, long ttl, GridCacheEntryEx<K, V> entry,
- IgnitePredicate<GridCacheEntry<K, V>>[] filters, GridCacheVersion drVer) {
++ public IgniteTxEntry(GridCacheContext<K, V> ctx,
++ IgniteTxEx<K, V> tx,
++ GridCacheOperation op,
++ V val,
++ IgniteClosure<V, V> transformClos,
++ long ttl,
++ GridCacheEntryEx<K,V> entry,
++ IgnitePredicate<GridCacheEntry<K, V>>[] filters,
++ GridCacheVersion drVer) {
+ assert ctx != null;
+ assert tx != null;
+ assert op != null;
+ assert entry != null;
+
+ this.ctx = ctx;
+ this.tx = tx;
+ this.val.value(op, val, false, false);
+ this.entry = entry;
+ this.ttl = ttl;
+ this.filters = filters;
+ this.drVer = drVer;
+
+ if (transformClos != null)
+ addTransformClosure(transformClos);
+
+ key = entry.key();
+ keyBytes = entry.keyBytes();
+
+ cacheId = entry.context().cacheId();
+
+ depEnabled = ctx.gridDeploy().enabled();
+ }
+
+ /**
+ * @return Cache context for this tx entry.
+ */
+ public GridCacheContext<K, V> context() {
+ return ctx;
+ }
+
+ /**
+ * @return Flag indicating if this entry is affinity mapped to the same node.
+ */
+ public boolean locallyMapped() {
+ return locMapped;
+ }
+
+ /**
+ * @param locMapped Flag indicating if this entry is affinity mapped to the same node.
+ */
+ public void locallyMapped(boolean locMapped) {
+ this.locMapped = locMapped;
+ }
+
+ /**
+ * @return {@code True} if this entry was added in group lock transaction and
+ * this is not a group lock entry.
+ */
+ public boolean groupLockEntry() {
+ return grpLock;
+ }
+
+ /**
+ * @param grpLock {@code True} if this entry was added in group lock transaction and
+ * this is not a group lock entry.
+ */
+ public void groupLockEntry(boolean grpLock) {
+ this.grpLock = grpLock;
+ }
+
+ /**
+ * @param transferRequired Sets flag indicating that transfer is required to remote node.
+ */
+ public void transferRequired(boolean transferRequired) {
+ this.transferRequired = transferRequired;
+ }
+
+ /**
+ * @return Flag indicating whether transfer is required to remote nodes.
+ */
+ public boolean transferRequired() {
+ return transferRequired;
+ }
+
+ /**
+ * @param ctx Context.
+ * @return Clean copy of this entry.
+ */
+ public IgniteTxEntry<K, V> cleanCopy(GridCacheContext<K, V> ctx) {
+ IgniteTxEntry<K, V> cp = new IgniteTxEntry<>();
+
+ cp.key = key;
+ cp.cacheId = cacheId;
+ cp.ctx = ctx;
+
+ cp.val = new TxEntryValueHolder<>();
+
+ cp.keyBytes = keyBytes;
+ cp.filters = filters;
+ cp.val.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
+ cp.val.valueBytes(val.valueBytes());
+ cp.transformClosCol = transformClosCol;
+ cp.ttl = ttl;
+ cp.drExpireTime = drExpireTime;
+ cp.explicitVer = explicitVer;
+ cp.grpLock = grpLock;
+ cp.depEnabled = depEnabled;
+ cp.drVer = drVer;
++ cp.expiryPlc = expiryPlc;
+
+ return cp;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return DHT version.
+ */
+ public GridCacheVersion dhtVersion() {
+ return dhtVer;
+ }
+
+ /**
+ * @param dhtVer DHT version.
+ */
+ public void dhtVersion(GridCacheVersion dhtVer) {
+ this.dhtVer = dhtVer;
+ }
+
+ /**
+ * @return {@code True} if tx entry was marked as locked.
+ */
+ public boolean locked() {
+ return locked;
+ }
+
+ /**
+ * Marks tx entry as locked.
+ */
+ public void markLocked() {
+ locked = true;
+ }
+
+ /**
+ * @param val Value to set.
+ */
+ void setAndMarkValid(V val) {
+ setAndMarkValid(op(), val, this.val.hasWriteValue(), this.val.hasReadValue());
+ }
+
+ /**
+ * @param op Operation.
+ * @param val Value to set.
+ */
+ void setAndMarkValid(GridCacheOperation op, V val) {
+ setAndMarkValid(op, val, this.val.hasWriteValue(), this.val.hasReadValue());
+ }
+
+ /**
+ * @param op Operation.
+ * @param val Value to set.
+ * @param hasReadVal Has read value flag.
+ * @param hasWriteVal Has write value flag.
+ */
+ void setAndMarkValid(GridCacheOperation op, V val, boolean hasWriteVal, boolean hasReadVal) {
+ this.val.value(op, val, hasWriteVal, hasReadVal);
+
+ markValid();
+ }
+
+ /**
+ * Marks this entry as value-has-bean-read. Effectively, makes values enlisted to transaction visible
+ * to further peek operations.
+ */
+ void markValid() {
+ prevVal.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
+ }
+
+ /**
+ * Marks entry as prepared.
+ *
+ * @return True if entry was marked prepared by this call.
+ */
+ boolean markPrepared() {
+ return prepared.compareAndSet(false, true);
+ }
+
+ /**
+ * @return Entry key.
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * @return Cache ID.
+ */
+ public int cacheId() {
+ return cacheId;
+ }
+
+ /**
+ * @return Tx key.
+ */
+ public IgniteTxKey<K> txKey() {
+ if (txKey == null)
+ txKey = new IgniteTxKey<>(key, cacheId);
+
+ return txKey;
+ }
+
+ /**
+ *
+ * @return Key bytes.
+ */
+ @Nullable public byte[] keyBytes() {
+ byte[] bytes = keyBytes;
+
+ if (bytes == null && entry != null) {
+ bytes = entry.keyBytes();
+
+ keyBytes = bytes;
+ }
+
+ return bytes;
+ }
+
+ /**
+ * @param keyBytes Key bytes.
+ */
+ public void keyBytes(byte[] keyBytes) {
+ initKeyBytes(keyBytes);
+ }
+
+ /**
+ * @return Underlying cache entry.
+ */
+ public GridCacheEntryEx<K, V> cached() {
+ return entry;
+ }
+
+ /**
+ * @param entry Cache entry.
+ * @param keyBytes Key bytes, possibly {@code null}.
+ */
+ public void cached(GridCacheEntryEx<K,V> entry, @Nullable byte[] keyBytes) {
+ assert entry != null;
+
+ assert entry.context() == ctx : "Invalid entry assigned to tx entry [txEntry=" + this +
+ ", entry=" + entry + ", ctxNear=" + ctx.isNear() + ", ctxDht=" + ctx.isDht() + ']';
+
+ this.entry = entry;
+
+ initKeyBytes(keyBytes);
+ }
+
+ /**
+ * Initialized key bytes locally and on the underlying entry.
+ *
+ * @param bytes Key bytes to initialize.
+ */
+ private void initKeyBytes(@Nullable byte[] bytes) {
+ if (bytes != null) {
+ keyBytes = bytes;
+
+ while (true) {
+ try {
+ if (entry != null)
+ entry.keyBytes(bytes);
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ entry = ctx.cache().entryEx(key);
+ }
+ }
+ }
+ else if (entry != null) {
+ bytes = entry.keyBytes();
+
+ if (bytes != null)
+ keyBytes = bytes;
+ }
+ }
+
+ /**
+ * @return Entry value.
+ */
+ @Nullable public V value() {
+ return val.value();
+ }
+
+ /**
+ * @return {@code True} if has value explicitly set.
+ */
+ public boolean hasValue() {
+ return val.hasValue();
+ }
+
+ /**
+ * @return {@code True} if has write value set.
+ */
+ public boolean hasWriteValue() {
+ return val.hasWriteValue();
+ }
+
+ /**
+ * @return {@code True} if has read value set.
+ */
+ public boolean hasReadValue() {
+ return val.hasReadValue();
+ }
+
+ /**
+ * @return Value visible for peek.
+ */
+ @Nullable public V previousValue() {
+ return prevVal.value();
+ }
+
+ /**
+ * @return {@code True} if has previous value explicitly set.
+ */
+ boolean hasPreviousValue() {
+ return prevVal.hasValue();
+ }
+
+ /**
+ * @return Previous operation to revert entry in case of filter failure.
+ */
+ @Nullable public GridCacheOperation previousOperation() {
+ return prevVal.op();
+ }
+
+ /**
+ * @return Value bytes.
+ */
+ @Nullable public byte[] valueBytes() {
+ return val.valueBytes();
+ }
+
+ /**
+ * @param valBytes Value bytes.
+ */
+ public void valueBytes(@Nullable byte[] valBytes) {
+ val.valueBytes(valBytes);
+ }
+
+ /**
+ * @return Time to live.
+ */
+ public long ttl() {
+ return ttl;
+ }
+
+ /**
+ * @param ttl Time to live.
+ */
+ public void ttl(long ttl) {
+ this.ttl = ttl;
+ }
+
+ /**
+ * @return DR expire time.
+ */
+ public long drExpireTime() {
+ return drExpireTime;
+ }
+
+ /**
+ * @param drExpireTime DR expire time.
+ */
+ public void drExpireTime(long drExpireTime) {
+ this.drExpireTime = drExpireTime;
+ }
+
+ /**
+ * @param val Entry value.
+ * @param writeVal Write value flag.
+ * @param readVal Read value flag.
+ */
+ public void value(@Nullable V val, boolean writeVal, boolean readVal) {
+ this.val.value(this.val.op(), val, writeVal, readVal);
+ }
+
+ /**
+ * Sets read value if this tx entrty does not have write value yet.
+ *
+ * @param val Read value to set.
+ */
+ public void readValue(@Nullable V val) {
+ this.val.value(this.val.op(), val, false, true);
+ }
+
+ /**
+ * @param transformClos Transform closure.
+ */
+ public void addTransformClosure(IgniteClosure<V, V> transformClos) {
+ if (transformClosCol == null)
+ transformClosCol = new LinkedList<>();
+
+ transformClosCol.add(transformClos);
+
+ // Must clear transform closure bytes since collection has changed.
+ transformClosBytes = null;
+
+ val.op(TRANSFORM);
+ }
+
+ /**
+ * @return Collection of transform closures.
+ */
+ public Collection<IgniteClosure<V, V>> transformClosures() {
+ return transformClosCol;
+ }
+
+ /**
+ * @param transformClosCol Collection of transform closures.
+ */
+ public void transformClosures(@Nullable Collection<IgniteClosure<V, V>> transformClosCol) {
+ this.transformClosCol = transformClosCol;
+
+ // Must clear transform closure bytes since collection has changed.
+ transformClosBytes = null;
+ }
+
+ /**
+ * @return Cache operation.
+ */
+ public GridCacheOperation op() {
+ return val.op();
+ }
+
+ /**
+ * @param op Cache operation.
+ */
+ public void op(GridCacheOperation op) {
+ val.op(op);
+ }
+
+ /**
+ * @return {@code True} if read entry.
+ */
+ public boolean isRead() {
+ return op() == READ;
+ }
+
+ /**
+ * @param explicitVer Explicit version.
+ */
+ public void explicitVersion(GridCacheVersion explicitVer) {
+ this.explicitVer = explicitVer;
+ }
+
+ /**
+ * @return Explicit version.
+ */
+ public GridCacheVersion explicitVersion() {
+ return explicitVer;
+ }
+
+ /**
+ * @return DR version.
+ */
+ @Nullable public GridCacheVersion drVersion() {
+ return drVer;
+ }
+
+ /**
+ * @param drVer DR version.
+ */
+ public void drVersion(@Nullable GridCacheVersion drVer) {
+ this.drVer = drVer;
+ }
+
+ /**
+ * @return Put filters.
+ */
+ public IgnitePredicate<GridCacheEntry<K, V>>[] filters() {
+ return filters;
+ }
+
+ /**
+ * @param filters Put filters.
+ */
+ public void filters(IgnitePredicate<GridCacheEntry<K, V>>[] filters) {
+ filterBytes = null;
+
+ this.filters = filters;
+ }
+
+ /**
+ * @return {@code True} if filters passed for fast-commit transactions.
+ */
+ public boolean filtersPassed() {
+ return filtersPassed;
+ }
+
+ /**
+ * @param filtersPassed {@code True} if filters passed for fast-commit transactions.
+ */
+ public void filtersPassed(boolean filtersPassed) {
+ this.filtersPassed = filtersPassed;
+ }
+
+ /**
+ * @return {@code True} if filters are set.
+ */
+ public boolean filtersSet() {
+ return filtersSet;
+ }
+
+ /**
+ * @param filtersSet {@code True} if filters are set and should not be replaced.
+ */
+ public void filtersSet(boolean filtersSet) {
+ this.filtersSet = filtersSet;
+ }
+
+ /**
++ * Marks expiry policy for transfer if it explicitly set and differs from default one.
++ */
++ public void transferExpiryPolicyIfNeeded() {
++ transferExpiryPlc = expiryPlc != null && expiryPlc != ctx.expiry();
++ }
++
++ /**
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void marshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ // Do not serialize filters if they are null.
+ if (depEnabled) {
+ if (keyBytes == null)
+ keyBytes = entry.getOrMarshalKeyBytes();
+
+ if (transformClosBytes == null && transformClosCol != null)
+ transformClosBytes = CU.marshal(ctx, transformClosCol);
+
+ if (F.isEmptyOrNulls(filters))
+ filterBytes = null;
+ else if (filterBytes == null)
+ filterBytes = CU.marshal(ctx, filters);
+ }
+
+ val.marshal(ctx, context(), depEnabled);
+ }
+
+ /**
+ * Unmarshalls entry.
+ *
+ * @param ctx Cache context.
+ * @param clsLdr Class loader.
+ * @throws IgniteCheckedException If un-marshalling failed.
+ */
+ public void unmarshal(GridCacheSharedContext<K, V> ctx, boolean near, ClassLoader clsLdr) throws IgniteCheckedException {
+ if (this.ctx == null) {
+ GridCacheContext<K, V> cacheCtx = ctx.cacheContext(cacheId);
+
+ if (cacheCtx.isNear() && !near)
+ cacheCtx = cacheCtx.near().dht().context();
+ else if (!cacheCtx.isNear() && near)
+ cacheCtx = cacheCtx.dht().near().context();
+
+ this.ctx = cacheCtx;
+ }
+
+ if (depEnabled) {
+ // Don't unmarshal more than once by checking key for null.
+ if (key == null)
+ key = ctx.marshaller().unmarshal(keyBytes, clsLdr);
+
+ // Unmarshal transform closure anyway if it exists.
+ if (transformClosBytes != null && transformClosCol == null)
+ transformClosCol = ctx.marshaller().unmarshal(transformClosBytes, clsLdr);
+
+ if (filters == null && filterBytes != null) {
+ filters = ctx.marshaller().unmarshal(filterBytes, clsLdr);
+
+ if (filters == null)
+ filters = CU.empty();
+ }
+ }
+
+ val.unmarshal(this.ctx, clsLdr, depEnabled);
+ }
+
++ /**
++ * @param expiryPlc Expiry policy.
++ */
++ public void expiry(@Nullable ExpiryPolicy expiryPlc) {
++ this.expiryPlc = expiryPlc;
++ }
++
++ /**
++ * @return Expiry policy.
++ */
++ @Nullable public ExpiryPolicy expiry() {
++ return expiryPlc;
++ }
++
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeBoolean(depEnabled);
+
+ if (depEnabled) {
+ U.writeByteArray(out, keyBytes);
+ U.writeByteArray(out, transformClosBytes);
+ U.writeByteArray(out, filterBytes);
+ }
+ else {
+ out.writeObject(key);
+ U.writeCollection(out, transformClosCol);
+ U.writeArray(out, filters);
+ }
+
+ out.writeInt(cacheId);
+
+ val.writeTo(out);
+
+ out.writeLong(ttl);
+ out.writeLong(drExpireTime);
+
+ CU.writeVersion(out, explicitVer);
+ out.writeBoolean(grpLock);
+ CU.writeVersion(out, drVer);
++
++ out.writeObject(transferExpiryPlc ? new GridCacheExternalizableExpiryPolicy(expiryPlc) : null);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"unchecked"})
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ depEnabled = in.readBoolean();
+
+ if (depEnabled) {
+ keyBytes = U.readByteArray(in);
+ transformClosBytes = U.readByteArray(in);
+ filterBytes = U.readByteArray(in);
+ }
+ else {
+ key = (K)in.readObject();
+ transformClosCol = U.readCollection(in);
+ filters = U.readEntryFilterArray(in);
+ }
+
+ cacheId = in.readInt();
+
+ val.readFrom(in);
+
+ ttl = in.readLong();
+ drExpireTime = in.readLong();
+
+ explicitVer = CU.readVersion(in);
+ grpLock = in.readBoolean();
+ drVer = CU.readVersion(in);
++
++ expiryPlc = (ExpiryPolicy)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object ggClassId() {
+ return GG_CLASS_ID;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class<?> deployClass() {
+ ClassLoader clsLdr = getClass().getClassLoader();
+
+ V val = value();
+
+ // First of all check classes that may be loaded by class loader other than application one.
+ return key != null && !clsLdr.equals(key.getClass().getClassLoader()) ?
+ key.getClass() : val != null ? val.getClass() : getClass();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClassLoader classLoader() {
+ return deployClass().getClassLoader();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return GridToStringBuilder.toString(IgniteTxEntry.class, this,
+ "keyBytesSize", keyBytes == null ? "null" : Integer.toString(keyBytes.length),
+ "xidVer", tx == null ? "null" : tx.xidVersion());
+ }
+
+ /**
+ * Auxiliary class to hold value, value-has-been-set flag, value update operation, value bytes.
+ */
+ private static class TxEntryValueHolder<K, V> {
+ /** */
+ @GridToStringInclude
+ private V val;
+
+ /** */
+ @GridToStringExclude
+ private byte[] valBytes;
+
+ /** */
+ @GridToStringInclude
+ private GridCacheOperation op = NOOP;
+
+ /** Flag indicating that value has been set for write. */
+ private boolean hasWriteVal;
+
+ /** Flag indicating that value has been set for read. */
+ private boolean hasReadVal;
+
+ /** Flag indicating that bytes were sent. */
+ private boolean valBytesSent;
+
+ /**
+ * @param op Cache operation.
+ * @param val Value.
+ * @param hasWriteVal Write value presence flag.
+ * @param hasReadVal Read value presence flag.
+ */
+ public void value(GridCacheOperation op, V val, boolean hasWriteVal, boolean hasReadVal) {
+ if (hasReadVal && this.hasWriteVal)
+ return;
+
+ boolean clean = this.val != null;
+
+ this.op = op;
+ this.val = val;
+
+ if (clean)
+ valBytes = null;
+
+ this.hasWriteVal = hasWriteVal || op == CREATE || op == UPDATE || op == DELETE;
+ this.hasReadVal = hasReadVal || op == READ;
+ }
+
+ /**
+ * @return {@code True} if has read or write value.
+ */
+ public boolean hasValue() {
+ return hasWriteVal || hasReadVal;
+ }
+
+ /**
+ * Gets stored value.
+ *
+ * @return Value.
+ */
+ public V value() {
+ return val;
+ }
+
+ /**
+ * @param val Stored value.
+ */
+ public void value(@Nullable V val) {
+ boolean clean = this.val != null;
+
+ this.val = val;
+
+ if (clean)
+ valBytes = null;
+ }
+
+ /**
+ * Gets cache operation.
+ *
+ * @return Cache operation.
+ */
+ public GridCacheOperation op() {
+ return op;
+ }
+
+ /**
+ * Sets cache operation.
+ *
+ * @param op Cache operation.
+ */
+ public void op(GridCacheOperation op) {
+ this.op = op;
+ }
+
+ /**
+ * @return {@code True} if write value was set.
+ */
+ public boolean hasWriteValue() {
+ return hasWriteVal;
+ }
+
+ /**
+ * @return {@code True} if read value was set.
+ */
+ public boolean hasReadValue() {
+ return hasReadVal;
+ }
+
+ /**
+ * Sets value bytes.
+ *
+ * @param valBytes Value bytes to set.
+ */
+ public void valueBytes(@Nullable byte[] valBytes) {
+ this.valBytes = valBytes;
+ }
+
+ /**
+ * Gets value bytes.
+ *
+ * @return Value bytes.
+ */
+ public byte[] valueBytes() {
+ return valBytes;
+ }
+
+ /**
+ * @param ctx Cache context.
+ * @param depEnabled Deployment enabled flag.
+ * @throws IgniteCheckedException If marshaling failed.
+ */
+ public void marshal(GridCacheSharedContext<K, V> sharedCtx, GridCacheContext<K, V> ctx, boolean depEnabled)
+ throws IgniteCheckedException {
+ boolean valIsByteArr = val != null && val instanceof byte[];
+
+ // Do not send write values to remote nodes.
+ if (hasWriteVal && val != null && !valIsByteArr && valBytes == null &&
+ (depEnabled || !ctx.isUnmarshalValues()))
+ valBytes = CU.marshal(sharedCtx, val);
+
+ valBytesSent = hasWriteVal && !valIsByteArr && valBytes != null && (depEnabled || !ctx.isUnmarshalValues());
+ }
+
+ /**
+ * @param ctx Cache context.
+ * @param ldr Class loader.
+ * @param depEnabled Deployment enabled flag.
+ * @throws IgniteCheckedException If unmarshalling failed.
+ */
+ public void unmarshal(GridCacheContext<K, V> ctx, ClassLoader ldr, boolean depEnabled) throws IgniteCheckedException {
+ if (valBytes != null && val == null && (ctx.isUnmarshalValues() || op == TRANSFORM || depEnabled))
+ val = ctx.marshaller().unmarshal(valBytes, ldr);
+ }
+
+ /**
+ * @param out Data output.
+ * @throws IOException If failed.
+ */
+ public void writeTo(ObjectOutput out) throws IOException {
+ out.writeBoolean(hasWriteVal);
+ out.writeBoolean(valBytesSent);
+
+ if (hasWriteVal) {
+ if (valBytesSent)
+ U.writeByteArray(out, valBytes);
+ else {
+ if (val != null && val instanceof byte[]) {
+ out.writeBoolean(true);
+
+ U.writeByteArray(out, (byte[])val);
+ }
+ else {
+ out.writeBoolean(false);
+
+ out.writeObject(val);
+ }
+ }
+ }
+
+ out.writeInt(op.ordinal());
+ }
+
+ /**
+ * @param in Data input.
+ * @throws IOException If failed.
+ * @throws ClassNotFoundException If failed.
+ */
+ public void readFrom(ObjectInput in) throws IOException, ClassNotFoundException {
+ hasWriteVal = in.readBoolean();
+ valBytesSent = in.readBoolean();
+
+ if (hasWriteVal) {
+ if (valBytesSent)
+ valBytes = U.readByteArray(in);
+ else
+ val = in.readBoolean() ? (V)U.readByteArray(in) : (V)in.readObject();
+ }
+
+ op = fromOrdinal(in.readInt());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "[op=" + op +", val=" + val + ", valBytesLen=" + (valBytes == null ? 0 : valBytes.length) + ']';
+ }
+ }
+ }