You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/22 11:42:46 UTC

[44/50] [abbrv] incubator-ignite git commit: # ignite-41

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f537940c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 0000000,4fc7140..a126460
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -1,0 -1,3179 +1,3285 @@@
+ /* @java.file.header */
+ 
+ /*  _________        _____ __________________        _____
+  *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+  *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+  *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+  *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+  */
+ 
+ package org.gridgain.grid.kernal.processors.cache.transactions;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.plugin.security.*;
+ import org.apache.ignite.portables.*;
+ import org.apache.ignite.transactions.*;
+ import org.gridgain.grid.cache.*;
+ import org.gridgain.grid.kernal.processors.cache.*;
+ import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+ import org.gridgain.grid.kernal.processors.cache.dr.*;
+ import org.gridgain.grid.kernal.processors.dr.*;
+ import org.gridgain.grid.util.*;
+ import org.gridgain.grid.util.future.*;
+ import org.gridgain.grid.util.lang.*;
+ import org.gridgain.grid.util.tostring.*;
+ import org.gridgain.grid.util.typedef.*;
+ import org.gridgain.grid.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+ 
++import javax.cache.expiry.*;
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.transactions.IgniteTxState.*;
+ import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
+ import static org.gridgain.grid.kernal.processors.dr.GridDrType.*;
+ 
+ /**
+  * Transaction adapter for cache transactions.
+  */
+ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
+     implements IgniteTxLocalEx<K, V> {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** Per-transaction read map. */
+     @GridToStringExclude
+     protected Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> txMap;
+ 
+     /** Read view on transaction map. */
+     @GridToStringExclude
+     protected IgniteTxMap<K, V> readView;
+ 
+     /** Write view on transaction map. */
+     @GridToStringExclude
+     protected IgniteTxMap<K, V> writeView;
+ 
+     /** Minimal version encountered (either explicit lock or XID of this transaction). */
+     protected GridCacheVersion minVer;
+ 
+     /** Flag indicating with TM commit happened. */
+     protected AtomicBoolean doneFlag = new AtomicBoolean(false);
+ 
+     /** Committed versions, relative to base. */
+     private Collection<GridCacheVersion> committedVers = Collections.emptyList();
+ 
+     /** Rolled back versions, relative to base. */
+     private Collection<GridCacheVersion> rolledbackVers = Collections.emptyList();
+ 
+     /** Base for completed versions. */
+     private GridCacheVersion completedBase;
+ 
+     /** Flag indicating partition lock in group lock transaction. */
+     private boolean partLock;
+ 
+     /** Flag indicating that transformed values should be sent to remote nodes. */
+     private boolean sndTransformedVals;
+ 
+     /** Commit error. */
+     protected AtomicReference<Throwable> commitErr = new AtomicReference<>();
+ 
+     /** Active cache IDs. */
+     protected Set<Integer> activeCacheIds = new HashSet<>();
+ 
+     /**
+      * Empty constructor required for {@link Externalizable}.
+      */
+     protected IgniteTxLocalAdapter() {
+         // No-op.
+     }
+ 
+     /**
+      * @param cctx Cache registry.
+      * @param xidVer Transaction ID.
+      * @param implicit {@code True} if transaction was implicitly started by the system,
+      *      {@code false} if it was started explicitly by user.
+      * @param implicitSingle {@code True} if transaction is implicit with only one key.
+      * @param sys System flag.
+      * @param concurrency Concurrency.
+      * @param isolation Isolation.
+      * @param timeout Timeout.
+      * @param txSize Expected transaction size.
+      * @param grpLockKey Group lock key if this is a group-lock transaction.
+      * @param partLock {@code True} if this is a group-lock transaction and lock is acquired for whole partition.
+      */
+     protected IgniteTxLocalAdapter(
+         GridCacheSharedContext<K, V> cctx,
+         GridCacheVersion xidVer,
+         boolean implicit,
+         boolean implicitSingle,
+         boolean sys,
+         IgniteTxConcurrency concurrency,
+         IgniteTxIsolation isolation,
+         long timeout,
+         boolean invalidate,
+         boolean storeEnabled,
+         int txSize,
+         @Nullable IgniteTxKey grpLockKey,
+         boolean partLock,
+         @Nullable UUID subjId,
+         int taskNameHash
+     ) {
+         super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate,
+             storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
+ 
+         assert !partLock || grpLockKey != null;
+ 
+         this.partLock = partLock;
+ 
+         minVer = xidVer;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public UUID eventNodeId() {
+         return cctx.localNodeId();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public UUID originatingNodeId() {
+         return cctx.localNodeId();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean empty() {
+         return txMap.isEmpty();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<UUID> masterNodeIds() {
+         return Collections.singleton(nodeId);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean partitionLock() {
+         return partLock;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Throwable commitError() {
+         return commitErr.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void commitError(Throwable e) {
+         commitErr.compareAndSet(null, e);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) {
+         assert false;
+         return false;
+     }
+ 
+     /**
+      * Gets collection of active cache IDs for this transaction.
+      *
+      * @return Collection of active cache IDs.
+      */
+     @Override public Collection<Integer> activeCacheIds() {
+         return activeCacheIds;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isStarted() {
+         return txMap != null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean hasWriteKey(IgniteTxKey<K> key) {
+         return writeView.containsKey(key);
+     }
+ 
+     /**
+      * @return Transaction read set.
+      */
+     @Override public Set<IgniteTxKey<K>> readSet() {
+         return txMap == null ? Collections.<IgniteTxKey<K>>emptySet() : readView.keySet();
+     }
+ 
+     /**
+      * @return Transaction write set.
+      */
+     @Override public Set<IgniteTxKey<K>> writeSet() {
+         return txMap == null ? Collections.<IgniteTxKey<K>>emptySet() : writeView.keySet();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean removed(IgniteTxKey<K> key) {
+         if (txMap == null)
+             return false;
+ 
+         IgniteTxEntry<K, V> e = txMap.get(key);
+ 
+         return e != null && e.op() == DELETE;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap() {
+         return readView == null ? Collections.<IgniteTxKey<K>, IgniteTxEntry<K, V>>emptyMap() : readView;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap() {
+         return writeView == null ? Collections.<IgniteTxKey<K>, IgniteTxEntry<K, V>>emptyMap() : writeView;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<IgniteTxEntry<K, V>> allEntries() {
+         return txMap == null ? Collections.<IgniteTxEntry<K, V>>emptySet() : txMap.values();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<IgniteTxEntry<K, V>> readEntries() {
+         return readView == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : readView.values();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<IgniteTxEntry<K, V>> writeEntries() {
+         return writeView == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : writeView.values();
+     }
+ 
+     /** {@inheritDoc} */
+     @Nullable @Override public IgniteTxEntry<K, V> entry(IgniteTxKey<K> key) {
+         return txMap == null ? null : txMap.get(key);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void seal() {
+         if (readView != null)
+             readView.seal();
+ 
+         if (writeView != null)
+             writeView.seal();
+     }
+ 
+     /**
+      * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent
+      * to remote nodes.
+      */
+     public void sendTransformedValues(boolean snd) {
+         sndTransformedVals = snd;
+     }
+ 
+     /**
+      * @return {@code True} if should be committed after lock is acquired.
+      */
+     protected boolean commitAfterLock() {
+         return implicit() && (!dht() || colocated());
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"RedundantTypeArguments"})
+     @Nullable @Override public GridTuple<V> peek(
+         GridCacheContext<K, V> cacheCtx,
+         boolean failFast,
+         K key,
+         IgnitePredicate<GridCacheEntry<K, V>>[] filter
+     ) throws GridCacheFilterFailedException {
+         IgniteTxEntry<K, V> e = txMap == null ? null : txMap.get(cacheCtx.txKey(key));
+ 
+         if (e != null) {
+             // We should look at tx entry previous value. If this is a user peek then previous
+             // value is the same as value. If this is a filter evaluation peek then previous value holds
+             // value visible to filter while value contains value enlisted for write.
+             if (!F.isAll(e.cached().wrap(false), filter))
+                 return e.hasPreviousValue() ? F.t(CU.<V>failed(failFast, e.previousValue())) : null;
+ 
+             return e.hasPreviousValue() ? F.t(e.previousValue()) : null;
+         }
+ 
+         return null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Boolean> loadMissing(
+         final GridCacheContext<K, V> cacheCtx,
+         boolean async,
+         final Collection<? extends K> keys,
+         boolean deserializePortable,
+         final IgniteBiInClosure<K, V> c
+     ) {
+         if (!async) {
+             try {
+                 return new GridFinishedFuture<>(cctx.kernalContext(),
+                     cacheCtx.store().loadAllFromStore(this, keys, c));
+             }
+             catch (IgniteCheckedException e) {
+                 return new GridFinishedFuture<>(cctx.kernalContext(), e);
+             }
+         }
+         else
+             return cctx.kernalContext().closure().callLocalSafe(
+                 new GPC<Boolean>() {
+                     @Override public Boolean call() throws Exception {
+                         return cacheCtx.store().loadAllFromStore(IgniteTxLocalAdapter.this, keys, c);
+                     }
+                 },
+                 true);
+     }
+ 
+     /**
+      * Gets minimum version present in transaction.
+      *
+      * @return Minimum versions.
+      */
+     @Override public GridCacheVersion minVersion() {
+         return minVer;
+     }
+ 
+     /**
+      * @throws IgniteCheckedException If prepare step failed.
+      */
+     @SuppressWarnings({"CatchGenericClass"})
+     public void userPrepare() throws IgniteCheckedException {
+         if (state() != PREPARING) {
+             if (timedOut())
+                 throw new IgniteTxTimeoutException("Transaction timed out: " + this);
+ 
+             IgniteTxState state = state();
+ 
+             setRollbackOnly();
+ 
+             throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']');
+         }
+ 
+         checkValid();
+ 
+         try {
+             cctx.tm().prepareTx(this);
+         }
+         catch (IgniteCheckedException e) {
+             throw e;
+         }
+         catch (Throwable e) {
+             setRollbackOnly();
+ 
+             throw new IgniteCheckedException("Transaction validation produced a runtime exception: " + this, e);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void commit() throws IgniteCheckedException {
+         try {
+             commitAsync().get();
+         }
+         finally {
+             cctx.tm().txContextReset();
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void prepare() throws IgniteCheckedException {
+         prepareAsync().get();
+     }
+ 
+     /**
+      * Checks that locks are in proper state for commit.
+      *
+      * @param entry Cache entry to check.
+      */
+     private void checkCommitLocks(GridCacheEntryEx<K, V> entry) {
+         assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode [entry=" + entry +
+             ", tx=" + this + ']';
+     }
+ 
+     /**
+      * Uncommits transaction by invalidating all of its entries.
+      */
+     @SuppressWarnings({"CatchGenericClass"})
+     private void uncommit() {
+         for (IgniteTxEntry<K, V> e : writeMap().values()) {
+             try {
+                 GridCacheEntryEx<K, V> cacheEntry = e.cached();
+ 
+                 if (e.op() != NOOP)
+                     cacheEntry.invalidate(null, xidVer);
+             }
+             catch (Throwable t) {
+                 U.error(log, "Failed to invalidate transaction entries while reverting a commit.", t);
+ 
+                 break;
+             }
+         }
+ 
+         cctx.tm().uncommitTx(this);
+     }
+ 
+     /**
+      * Gets cache entry for given key.
+      *
+      * @param key Key.
+      * @return Cache entry.
+      */
+     protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key) {
+         return cacheCtx.cache().entryEx(key.key());
+     }
+ 
+     /**
+      * Gets cache entry for given key and topology version.
+      *
+      * @param key Key.
+      * @param topVer Topology version.
+      * @return Cache entry.
+      */
+     protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key, long topVer) {
+         return cacheCtx.cache().entryEx(key.key(), topVer);
+     }
+ 
+     /**
+      * Performs batch database operations. This commit must be called
+      * before {@link #userCommit()}. This way if there is a DB failure,
+      * cache transaction can still be rolled back.
+      *
+      * @param writeEntries Transaction write set.
+      * @throws IgniteCheckedException If batch update failed.
+      */
+     @SuppressWarnings({"CatchGenericClass"})
+     protected void batchStoreCommit(Iterable<IgniteTxEntry<K, V>> writeEntries) throws IgniteCheckedException {
+         GridCacheStoreManager<K, V> store = store();
+ 
+         if (store != null && storeEnabled() && (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) {
+             try {
+                 if (writeEntries != null) {
+                     Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null;
+                     List<K> rmvCol = null;
+ 
+                     boolean skipNear = near() && store.writeToStoreFromDht();
+ 
+                     for (IgniteTxEntry<K, V> e : writeEntries) {
+                         if (skipNear && e.cached().isNear())
+                             continue;
+ 
+                         boolean intercept = e.context().config().getInterceptor() != null;
+ 
+                         if (intercept || !F.isEmpty(e.transformClosures()))
+                             e.cached().unswap(true, false);
+ 
+                         GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false);
+ 
+                         GridCacheContext<K, V> cacheCtx = e.context();
+ 
+                         GridCacheOperation op = res.get1();
+                         K key = e.key();
+                         V val = res.get2();
+                         GridCacheVersion ver = writeVersion();
+ 
+                         if (op == CREATE || op == UPDATE) {
+                             // Batch-process all removes if needed.
+                             if (rmvCol != null && !rmvCol.isEmpty()) {
+                                 store.removeAllFromStore(this, rmvCol);
+ 
+                                 // Reset.
+                                 rmvCol.clear();
+                             }
+ 
+                             if (intercept) {
+                                 V old = e.cached().rawGetOrUnmarshal(true);
+ 
+                                 val = (V)cacheCtx.config().getInterceptor().onBeforePut(key, old, val);
+ 
+                                 if (val == null)
+                                     continue;
+ 
+                                 val = cacheCtx.unwrapTemporary(val);
+                             }
+ 
+                             if (putMap == null)
+                                 putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
+ 
+                             putMap.put(key, F.t(val, ver));
+                         }
+                         else if (op == DELETE) {
+                             // Batch-process all puts if needed.
+                             if (putMap != null && !putMap.isEmpty()) {
+                                 store.putAllToStore(this, putMap);
+ 
+                                 // Reset.
+                                 putMap.clear();
+                             }
+ 
+                             if (intercept) {
+                                 V old = e.cached().rawGetOrUnmarshal(true);
+ 
+                                 IgniteBiTuple<Boolean, V> t = cacheCtx.config().<K, V>getInterceptor()
+                                     .onBeforeRemove(key, old);
+ 
+                                 if (cacheCtx.cancelRemove(t))
+                                     continue;
+                             }
+ 
+                             if (rmvCol == null)
+                                 rmvCol = new LinkedList<>();
+ 
+                             rmvCol.add(key);
+                         }
+                         else if (log.isDebugEnabled())
+                             log.debug("Ignoring NOOP entry for batch store commit: " + e);
+                     }
+ 
+                     if (putMap != null && !putMap.isEmpty()) {
+                         assert rmvCol == null || rmvCol.isEmpty();
+ 
+                         // Batch put at the end of transaction.
+                         store.putAllToStore(this, putMap);
+                     }
+ 
+                     if (rmvCol != null && !rmvCol.isEmpty()) {
+                         assert putMap == null || putMap.isEmpty();
+ 
+                         // Batch remove at the end of transaction.
+                         store.removeAllFromStore(this, rmvCol);
+                     }
+                 }
+ 
+                 // Commit while locks are held.
+                 store.txEnd(this, true);
+             }
+             catch (IgniteCheckedException ex) {
+                 commitError(ex);
+ 
+                 setRollbackOnly();
+ 
+                 // Safe to remove transaction from committed tx list because nothing was committed yet.
+                 cctx.tm().removeCommittedTx(this);
+ 
+                 throw ex;
+             }
+             catch (Throwable ex) {
+                 commitError(ex);
+ 
+                 setRollbackOnly();
+ 
+                 // Safe to remove transaction from committed tx list because nothing was committed yet.
+                 cctx.tm().removeCommittedTx(this);
+ 
+                 throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex);
+             }
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"CatchGenericClass"})
+     @Override public void userCommit() throws IgniteCheckedException {
+         IgniteTxState state = state();
+ 
+         if (state != COMMITTING) {
+             if (timedOut())
+                 throw new IgniteTxTimeoutException("Transaction timed out: " + this);
+ 
+             setRollbackOnly();
+ 
+             throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']');
+         }
+ 
+         checkValid();
+ 
+         boolean empty = F.isEmpty(near() ? txMap : writeMap());
+ 
+         // Register this transaction as completed prior to write-phase to
+         // ensure proper lock ordering for removed entries.
+         // We add colocated transaction to committed set even if it is empty to correctly order
+         // locks on backup nodes.
+         if (!empty || colocated())
+             cctx.tm().addCommittedTx(this);
+ 
+         if (groupLock())
+             addGroupTxMapping(writeSet());
+ 
+         if (!empty) {
+             // We are holding transaction-level locks for entries here, so we can get next write version.
+             writeVersion(cctx.versions().next(topologyVersion()));
+ 
+             batchStoreCommit(writeMap().values());
+ 
+             try {
+                 cctx.tm().txContext(this);
+ 
+                 long topVer = topologyVersion();
+ 
+                 /*
+                  * Commit to cache. Note that for 'near' transaction we loop through all the entries.
+                  */
+                 for (IgniteTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) {
+                     GridCacheContext<K, V> cacheCtx = txEntry.context();
+ 
+                     GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE;
+ 
+                     UUID nodeId = txEntry.nodeId() == null ? this.nodeId : txEntry.nodeId();
+ 
+                     try {
+                         while (true) {
+                             try {
+                                 GridCacheEntryEx<K, V> cached = txEntry.cached();
+ 
+                                 // Must try to evict near entries before committing from
+                                 // transaction manager to make sure locks are held.
+                                 if (!evictNearEntry(txEntry, false)) {
+                                     if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) {
+                                         cached.markObsolete(xidVer);
+ 
+                                         break;
+                                     }
+ 
+                                     if (cached.detached())
+                                         break;
+ 
+                                     GridCacheEntryEx<K, V> nearCached = null;
+ 
+                                     boolean metrics = true;
+ 
+                                     if (updateNearCache(cacheCtx, txEntry.key(), topVer))
+                                         nearCached = cacheCtx.dht().near().peekEx(txEntry.key());
+                                     else if (cacheCtx.isNear() && txEntry.locallyMapped())
+                                         metrics = false;
+ 
+                                     boolean evt = !isNearLocallyMapped(txEntry, false);
+ 
+                                     // For near local transactions we must record DHT version
+                                     // in order to keep near entries on backup nodes until
+                                     // backup remote transaction completes.
+                                     if (cacheCtx.isNear())
+                                         ((GridNearCacheEntry<K, V>)cached).recordDhtVersion(txEntry.dhtVersion());
+ 
+                                     if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters()))
+                                         txEntry.cached().unswap(true, false);
+ 
+                                     GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,
+                                         true);
+ 
+                                     GridCacheOperation op = res.get1();
+                                     V val = res.get2();
+                                     byte[] valBytes = res.get3();
+ 
 -                                    // Preserve TTL if needed.
 -                                    if (txEntry.ttl() < 0)
 -                                        txEntry.ttl(cached.ttl());
 -
+                                     // Deal with DR conflicts.
+                                     GridCacheVersion explicitVer = txEntry.drVersion() != null ?
+                                         txEntry.drVersion() : writeVersion();
+ 
++                                    if (op == CREATE || op == UPDATE && txEntry.drExpireTime() == -1L) {
++                                        ExpiryPolicy expiry = txEntry.expiry();
++
++                                        if (expiry == null)
++                                            expiry = cacheCtx.expiry();
++
++                                        if (expiry != null) {
++                                            Duration duration = cached.hasValue() ?
++                                                expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
++
++                                            txEntry.ttl(GridCacheUtils.toTtl(duration));
++                                        }
++                                    }
++
+                                     GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached,
+                                         txEntry,
+                                         explicitVer,
+                                         op,
+                                         val,
+                                         valBytes,
+                                         txEntry.ttl(),
+                                         txEntry.drExpireTime());
+ 
+                                     if (drRes != null) {
+                                         op = drRes.operation();
+                                         val = drRes.value();
+                                         valBytes = drRes.valueBytes();
+ 
+                                         if (drRes.isMerge())
+                                             explicitVer = writeVersion();
++                                        else if (op == NOOP)
++                                            txEntry.ttl(-1L);
+                                     }
+                                     else
+                                         // Nullify explicit version so that innerSet/innerRemove will work as usual.
+                                         explicitVer = null;
+ 
+                                     if (sndTransformedVals || (drRes != null)) {
+                                         assert sndTransformedVals && cacheCtx.isReplicated() || (drRes != null);
+ 
+                                         txEntry.value(val, true, false);
+                                         txEntry.valueBytes(valBytes);
+                                         txEntry.op(op);
+                                         txEntry.transformClosures(null);
+                                         txEntry.drVersion(explicitVer);
+                                     }
+ 
+                                     if (op == CREATE || op == UPDATE) {
+                                         GridCacheUpdateTxResult<V> updRes = cached.innerSet(
+                                             this,
+                                             eventNodeId(),
+                                             txEntry.nodeId(),
+                                             val,
+                                             valBytes,
+                                             false,
+                                             false,
+                                             txEntry.ttl(),
+                                             evt,
+                                             metrics,
+                                             topVer,
+                                             txEntry.filters(),
+                                             cached.detached() ? DR_NONE : drType,
+                                             txEntry.drExpireTime(),
+                                             cached.isNear() ? null : explicitVer,
+                                             CU.subjectId(this, cctx),
+                                             resolveTaskName());
+ 
+                                         if (nearCached != null && updRes.success())
+                                             nearCached.innerSet(
+                                                 null,
+                                                 eventNodeId(),
+                                                 nodeId,
+                                                 val,
+                                                 valBytes,
+                                                 false,
+                                                 false,
+                                                 txEntry.ttl(),
+                                                 false,
+                                                 metrics,
+                                                 topVer,
+                                                 CU.<K, V>empty(),
+                                                 DR_NONE,
+                                                 txEntry.drExpireTime(),
+                                                 null,
+                                                 CU.subjectId(this, cctx),
+                                                 resolveTaskName());
+                                     }
+                                     else if (op == DELETE) {
+                                         GridCacheUpdateTxResult<V> updRes = cached.innerRemove(
+                                             this,
+                                             eventNodeId(),
+                                             txEntry.nodeId(),
+                                             false,
+                                             false,
+                                             evt,
+                                             metrics,
+                                             topVer,
+                                             txEntry.filters(),
+                                             cached.detached()  ? DR_NONE : drType,
+                                             cached.isNear() ? null : explicitVer,
+                                             CU.subjectId(this, cctx),
+                                             resolveTaskName());
+ 
+                                         if (nearCached != null && updRes.success())
+                                             nearCached.innerRemove(
+                                                 null,
+                                                 eventNodeId(),
+                                                 nodeId,
+                                                 false,
+                                                 false,
+                                                 false,
+                                                 metrics,
+                                                 topVer,
+                                                 CU.<K, V>empty(),
+                                                 DR_NONE,
+                                                 null,
+                                                 CU.subjectId(this, cctx),
+                                                 resolveTaskName());
+                                     }
+                                     else if (op == RELOAD) {
+                                         cached.innerReload(CU.<K, V>empty());
+ 
+                                         if (nearCached != null)
+                                             nearCached.innerReload(CU.<K, V>empty());
+                                     }
+                                     else if (op == READ) {
++                                        Duration duration = expiryForAccess(txEntry);
++
++                                        if (duration != null)
++                                            cached.updateTtl(null, GridCacheUtils.toTtl(duration));
++
+                                         if (log.isDebugEnabled())
+                                             log.debug("Ignoring READ entry when committing: " + txEntry);
+                                     }
+                                     else {
+                                         assert !groupLock() || txEntry.groupLockEntry() || ownsLock(txEntry.cached()):
+                                             "Transaction does not own lock for group lock entry during  commit [tx=" +
+                                                 this + ", txEntry=" + txEntry + ']';
+ 
++                                        if (txEntry.ttl() != -1L)
++                                            cached.updateTtl(null, txEntry.ttl());
++
+                                         if (log.isDebugEnabled())
+                                             log.debug("Ignoring NOOP entry when committing: " + txEntry);
+                                     }
+                                 }
+ 
+                                 // Check commit locks after set, to make sure that
+                                 // we are not changing obsolete entries.
+                                 // (innerSet and innerRemove will throw an exception
+                                 // if an entry is obsolete).
+                                 if (txEntry.op() != READ && !txEntry.groupLockEntry())
+                                     checkCommitLocks(cached);
+ 
+                                 // Break out of while loop.
+                                 break;
+                             }
+                             // If entry cached within transaction got removed.
+                             catch (GridCacheEntryRemovedException ignored) {
+                                 if (log.isDebugEnabled())
+                                     log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
+ 
+                                 txEntry.cached(entryEx(cacheCtx, txEntry.txKey()), txEntry.keyBytes());
+                             }
+                         }
+                     }
+                     catch (Throwable ex) {
+                         // We are about to initiate transaction rollback when tx has started to committing.
+                         // Need to remove version from committed list.
+                         cctx.tm().removeCommittedTx(this);
+ 
+                         if (X.hasCause(ex, GridCacheIndexUpdateException.class) && cacheCtx.cache().isMongoDataCache()) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Failed to update mongo document index (transaction entry will " +
+                                     "be ignored): " + txEntry);
+ 
+                             // Set operation to NOOP.
+                             txEntry.op(NOOP);
+ 
+                             setRollbackOnly();
+ 
+                             throw ex;
+                         }
+                         else {
+                             IgniteCheckedException err = new IgniteTxHeuristicException("Failed to locally write to cache " +
+                                 "(all transaction entries will be invalidated, however there was a window when " +
+                                 "entries for this transaction were visible to others): " + this, ex);
+ 
+                             U.error(log, "Heuristic transaction failure.", err);
+ 
+                             commitErr.compareAndSet(null, err);
+ 
+                             state(UNKNOWN);
+ 
+                             try {
+                                 // Courtesy to minimize damage.
+                                 uncommit();
+                             }
+                             catch (Throwable ex1) {
+                                 U.error(log, "Failed to uncommit transaction: " + this, ex1);
+                             }
+ 
+                             throw err;
+                         }
+                     }
+                 }
++
++                if (!near()) {
++                    for (GridCacheTxEntry<K, V> txEntry : readEntries()) {
++                        Duration duration = expiryForAccess(txEntry);
++
++                        if (duration != null)
++                            txEntry.cached().updateTtl(null, GridCacheUtils.toTtl(duration));
++                    }
++                }
+             }
+             finally {
+                 cctx.tm().txContextReset();
+             }
+         }
+         else {
++            for (GridCacheTxEntry<K, V> txEntry : readEntries()) {
++                Duration duration = expiryForAccess(txEntry);
++
++                if (duration != null)
++                    txEntry.cached().updateTtl(null, GridCacheUtils.toTtl(duration));
++            }
++
+             GridCacheStoreManager<K, V> store = store();
+ 
+             if (store != null && (!internal() || groupLock())) {
+                 try {
+                     store.txEnd(this, true);
+                 }
+                 catch (IgniteCheckedException e) {
+                     commitError(e);
+ 
+                     setRollbackOnly();
+ 
+                     cctx.tm().removeCommittedTx(this);
+ 
+                     throw e;
+                 }
+             }
+         }
+ 
+         // Do not unlock transaction entries if one-phase commit.
+         if (!onePhaseCommit()) {
+             if (doneFlag.compareAndSet(false, true)) {
+                 // Unlock all locks.
+                 cctx.tm().commitTx(this);
+ 
+                 boolean needsCompletedVersions = needsCompletedVersions();
+ 
+                 assert !needsCompletedVersions || completedBase != null;
+                 assert !needsCompletedVersions || committedVers != null;
+                 assert !needsCompletedVersions || rolledbackVers != null;
+             }
+         }
+     }
+ 
+     /**
++     * @param txEntry Tx entry.
++     * @return New duration.
++     */
++    @Nullable private Duration expiryForAccess(GridCacheTxEntry<K, V> txEntry) {
++        ExpiryPolicy expiry = txEntry.expiry();
++
++        if (expiry == null)
++            expiry = txEntry.context().expiry();
++
++        return expiry != null ? expiry.getExpiryForAccess() : null;
++    }
++
++    /**
+      * Commits transaction to transaction manager. Used for one-phase commit transactions only.
+      */
+     public void tmCommit() {
+         assert onePhaseCommit();
+ 
+         if (doneFlag.compareAndSet(false, true)) {
+             // Unlock all locks.
+             cctx.tm().commitTx(this);
+ 
+             state(COMMITTED);
+ 
+             boolean needsCompletedVersions = needsCompletedVersions();
+ 
+             assert !needsCompletedVersions || completedBase != null;
+             assert !needsCompletedVersions || committedVers != null;
+             assert !needsCompletedVersions || rolledbackVers != null;
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void completedVersions(
+         GridCacheVersion completedBase,
+         Collection<GridCacheVersion> committedVers,
+         Collection<GridCacheVersion> rolledbackVers) {
+         this.completedBase = completedBase;
+         this.committedVers = committedVers;
+         this.rolledbackVers = rolledbackVers;
+     }
+ 
+     /**
+      * @return Completed base for ordering.
+      */
+     public GridCacheVersion completedBase() {
+         return completedBase;
+     }
+ 
+     /**
+      * @return Committed versions.
+      */
+     public Collection<GridCacheVersion> committedVersions() {
+         return committedVers;
+     }
+ 
+     /**
+      * @return Rolledback versions.
+      */
+     public Collection<GridCacheVersion> rolledbackVersions() {
+         return rolledbackVers;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void userRollback() throws IgniteCheckedException {
+         IgniteTxState state = state();
+ 
+         if (state != ROLLING_BACK && state != ROLLED_BACK) {
+             setRollbackOnly();
+ 
+             throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state + ", tx=" + this + ']',
+                 commitErr.get());
+         }
+ 
+         if (doneFlag.compareAndSet(false, true)) {
+             try {
+                 if (near())
+                     // Must evict near entries before rolling back from
+                     // transaction manager, so they will be removed from cache.
+                     for (IgniteTxEntry<K, V> e : allEntries())
+                         evictNearEntry(e, false);
+ 
+                 cctx.tm().rollbackTx(this);
+ 
+                 GridCacheStoreManager<K, V> store = store();
+ 
+                 if (store != null && (near() || store.writeToStoreFromDht())) {
+                     if (!internal() || groupLock())
+                         store.txEnd(this, false);
+                 }
+             }
+             catch (Error | IgniteCheckedException | RuntimeException e) {
+                 U.addLastCause(e, commitErr.get(), log);
+ 
+                 throw e;
+             }
+         }
+     }
+ 
+     /**
+      * Checks if there is a cached or swapped value for
+      * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
+      *
+      *
+      * @param keys Key to enlist.
+      * @param cached Cached entry, if called from entry wrapper.
+      * @param map Return map.
+      * @param missed Map of missed keys.
+      * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
+      * @param deserializePortable Deserialize portable flag.
+      * @param filter Filter to test.
+      * @throws IgniteCheckedException If failed.
+      * @return Enlisted keys.
+      */
+     @SuppressWarnings({"RedundantTypeArguments"})
+     private Collection<K> enlistRead(
+         final GridCacheContext<K, V> cacheCtx,
+         Collection<? extends K> keys,
+         @Nullable GridCacheEntryEx<K, V> cached,
+         Map<K, V> map,
+         Map<K, GridCacheVersion> missed,
+         int keysCnt,
+         boolean deserializePortable,
+         IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+         assert !F.isEmpty(keys);
+         assert keysCnt == keys.size();
+         assert cached == null || F.first(keys).equals(cached.key());
+ 
+         cacheCtx.checkSecurity(GridSecurityPermission.CACHE_READ);
+ 
+         groupLockSanityCheck(cacheCtx, keys);
+ 
+         boolean single = keysCnt == 1;
+ 
+         Collection<K> lockKeys = null;
+ 
+         long topVer = topologyVersion();
+ 
+         // In this loop we cover only read-committed or optimistic transactions.
+         // Transactions that are pessimistic and not read-committed are covered
+         // outside of this loop.
+         for (K key : keys) {
+             if (key == null)
+                 continue;
+ 
+             if (pessimistic() && !readCommitted())
+                 addActiveCache(cacheCtx);
+ 
+             IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+ 
+             // Check write map (always check writes first).
+             IgniteTxEntry<K, V> txEntry = entry(txKey);
+ 
+             // Either non-read-committed or there was a previous write.
+             if (txEntry != null) {
+                 if (cacheCtx.isAll(txEntry.cached(), filter)) {
+                     V val = txEntry.value();
+ 
+                     // Read value from locked entry in group-lock transaction as well.
+                     if (txEntry.hasValue()) {
+                         if (!F.isEmpty(txEntry.transformClosures())) {
+                             for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+                                 val = clos.apply(val);
+                         }
+ 
+                         if (val != null) {
+                             V val0 = val;
+ 
+                             if (cacheCtx.portableEnabled())
+                                 val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+ 
+                             map.put(key, val0);
+                         }
+                     }
+                     else {
+                         assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry());
+ 
+                         while (true) {
+                             try {
+                                 Object transformClo =
+                                     (txEntry.op() == TRANSFORM  && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+                                         F.first(txEntry.transformClosures()) : null;
+ 
+                                 val = txEntry.cached().innerGet(this,
+                                     /*swap*/true,
+                                     /*read-through*/false,
+                                     /*fail fast*/true,
+                                     /*unmarshal*/true,
+                                     /*metrics*/true,
+                                     /*event*/true,
+                                     /*temporary*/false,
+                                     CU.subjectId(this, cctx),
+                                     transformClo,
+                                     resolveTaskName(),
 -                                    filter);
++                                    filter,
++                                    null);
+ 
+                                 if (val != null) {
+                                     if (!readCommitted())
+                                         txEntry.readValue(val);
+ 
+                                     if (!F.isEmpty(txEntry.transformClosures())) {
+                                         for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+                                             val = clos.apply(val);
+                                     }
+ 
+                                     V val0 = val;
+ 
+                                     if (cacheCtx.portableEnabled())
+                                         val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+ 
+                                     map.put(key, val0);
+                                 }
+                                 else
+                                     missed.put(key, txEntry.cached().version());
+ 
+                                 break;
+                             }
+                             catch (GridCacheFilterFailedException e) {
+                                 if (log.isDebugEnabled())
+                                     log.debug("Filter validation failed for entry: " + txEntry);
+ 
+                                 if (!readCommitted())
+                                     txEntry.readValue(e.<V>value());
+                             }
+                             catch (GridCacheEntryRemovedException ignored) {
+                                 txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer), txEntry.keyBytes());
+                             }
+                         }
+                     }
+                 }
+             }
+             // First time access within transaction.
+             else {
+                 if (lockKeys == null)
+                     lockKeys = single ? (Collection<K>)keys : new ArrayList<K>(keysCnt);
+ 
+                 if (!single)
+                     lockKeys.add(key);
+ 
+                 while (true) {
+                     GridCacheEntryEx<K, V> entry;
+ 
+                     if (cached != null) {
+                         entry = cached;
+ 
+                         cached = null;
+                     }
+                     else
+                         entry = entryEx(cacheCtx, txKey, topVer);
+ 
+                     try {
+                         GridCacheVersion ver = entry.version();
+ 
+                         V val = null;
+ 
+                         if (!pessimistic() || readCommitted() || groupLock()) {
+                             // This call will check for filter.
+                             val = entry.innerGet(this,
+                                 /*swap*/true,
+                                 /*no read-through*/false,
+                                 /*fail-fast*/true,
+                                 /*unmarshal*/true,
+                                 /*metrics*/true,
+                                 /*event*/true,
+                                 /*temporary*/false,
+                                 CU.subjectId(this, cctx),
+                                 null,
+                                 resolveTaskName(),
 -                                filter);
++                                filter,
++                                null);
+ 
+                             if (val != null) {
+                                 V val0 = val;
+ 
+                                 if (cacheCtx.portableEnabled())
+                                     val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+ 
+                                 map.put(key, val0);
+                             }
+                             else
+                                 missed.put(key, ver);
+                         }
+                         else
+                             // We must wait for the lock in pessimistic mode.
+                             missed.put(key, ver);
+ 
+                         if (!readCommitted()) {
 -                            txEntry = addEntry(READ, val, null, entry, -1, filter, true, -1L, -1L, null);
++                            txEntry = addEntry(READ, val, null, entry, null, filter, true, -1L, -1L, null);
+ 
+                             if (groupLock())
+                                 txEntry.groupLockEntry(true);
+ 
+                             // As optimization, mark as checked immediately
+                             // for non-pessimistic if value is not null.
+                             if (val != null && !pessimistic())
+                                 txEntry.markValid();
+                         }
+ 
+                         break; // While.
+                     }
+                     catch (GridCacheEntryRemovedException ignored) {
+                         if (log.isDebugEnabled())
+                             log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+                     }
+                     catch (GridCacheFilterFailedException e) {
+                         if (log.isDebugEnabled())
+                             log.debug("Filter validation failed for entry: " + entry);
+ 
+                         if (!readCommitted()) {
+                             // Value for which failure occurred.
+                             V val = e.<V>value();
+ 
 -                            txEntry = addEntry(READ, val, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L, null);
++                            txEntry = addEntry(READ, val, null, entry, null, CU.<K, V>empty(), false, -1L, -1L, null);
+ 
+                             // Mark as checked immediately for non-pessimistic.
+                             if (val != null && !pessimistic())
+                                 txEntry.markValid();
+                         }
+ 
+                         break; // While loop.
+                     }
+                 }
+             }
+         }
+ 
+         return lockKeys != null ? lockKeys : Collections.<K>emptyList();
+     }
+ 
+     /**
+      * Adds skipped key.
+      *
+      * @param skipped Skipped set (possibly {@code null}).
+      * @param key Key to add.
+      * @return Skipped set.
+      */
+     private Set<K> skip(Set<K> skipped, K key) {
+         if (skipped == null)
+             skipped = new GridLeanSet<>();
+ 
+         skipped.add(key);
+ 
+         if (log.isDebugEnabled())
+             log.debug("Added key to skipped set: " + key);
+ 
+         return skipped;
+     }
+ 
+     /**
+      * Loads all missed keys for
+      * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
+      *
+      * @param map Return map.
+      * @param missedMap Missed keys.
+      * @param redos Keys to retry.
+      * @param deserializePortable Deserialize portable flag.
+      * @param filter Filter.
+      * @return Loaded key-value pairs.
+      */
+     private IgniteFuture<Map<K, V>> checkMissed(
+         final GridCacheContext<K, V> cacheCtx,
+         final Map<K, V> map,
+         final Map<K, GridCacheVersion> missedMap,
+         @Nullable final Collection<K> redos,
+         final boolean deserializePortable,
+         final IgnitePredicate<GridCacheEntry<K, V>>[] filter
+     ) {
+         assert redos != null || pessimistic();
+ 
+         if (log.isDebugEnabled())
+             log.debug("Loading missed values for missed map: " + missedMap);
+ 
+         final Collection<K> loaded = new HashSet<>();
+ 
+         return new GridEmbeddedFuture<>(cctx.kernalContext(),
+             loadMissing(
+                 cacheCtx,
+                 false, missedMap.keySet(), deserializePortable, new CI2<K, V>() {
+                 /** */
+                 private GridCacheVersion nextVer;
+ 
+                 @Override public void apply(K key, V val) {
+                     if (isRollbackOnly()) {
+                         if (log.isDebugEnabled())
+                             log.debug("Ignoring loaded value for read because transaction was rolled back: " +
+                                 IgniteTxLocalAdapter.this);
+ 
+                         return;
+                     }
+ 
+                     GridCacheVersion ver = missedMap.get(key);
+ 
+                     if (ver == null) {
+                         if (log.isDebugEnabled())
+                             log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
+ 
+                         return;
+                     }
+ 
+                     V visibleVal = val;
+ 
+                     IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+ 
+                     IgniteTxEntry<K, V> txEntry = entry(txKey);
+ 
+                     if (txEntry != null) {
+                         if (!readCommitted())
+                             txEntry.readValue(val);
+ 
+                         if (!F.isEmpty(txEntry.transformClosures())) {
+                             for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+                                 visibleVal = clos.apply(visibleVal);
+                         }
+                     }
+ 
+                     // In pessimistic mode we hold the lock, so filter validation
+                     // should always be valid.
+                     if (pessimistic())
+                         ver = null;
+ 
+                     // Initialize next version.
+                     if (nextVer == null)
+                         nextVer = cctx.versions().next(topologyVersion());
+ 
+                     while (true) {
+                         assert txEntry != null || readCommitted() || groupLock();
+ 
+                         GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+ 
+                         try {
+                             boolean pass = cacheCtx.isAll(e, filter);
+ 
+                             // Must initialize to true since even if filter didn't pass,
+                             // we still record the transaction value.
+                             boolean set = true;
+ 
+                             if (pass) {
+                                 try {
+                                     set = e.versionedValue(val, ver, nextVer);
+                                 }
+                                 catch (GridCacheEntryRemovedException ignore) {
+                                     if (log.isDebugEnabled())
+                                         log.debug("Got removed entry in transaction getAll method " +
+                                             "(will try again): " + e);
+ 
+                                     if (pessimistic() && !readCommitted() && !isRollbackOnly() &&
+                                         (!groupLock() || F.eq(e.key(), groupLockKey()))) {
+                                         U.error(log, "Inconsistent transaction state (entry got removed while " +
+                                             "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
+ 
+                                         setRollbackOnly();
+ 
+                                         return;
+                                     }
+ 
+                                     if (txEntry != null)
+                                         txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
+ 
+                                     continue; // While loop.
+                                 }
+                             }
+ 
+                             // In pessimistic mode, we should always be able to set.
+                             assert set || !pessimistic();
+ 
+                             if (readCommitted() || groupLock()) {
+                                 cacheCtx.evicts().touch(e, topologyVersion());
+ 
+                                 if (pass && visibleVal != null)
+                                     map.put(key, visibleVal);
+                             }
+                             else {
+                                 assert txEntry != null;
+ 
+                                 if (set || F.isEmptyOrNulls(filter)) {
+                                     txEntry.setAndMarkValid(val);
+ 
+                                     if (pass && visibleVal != null)
+                                         map.put(key, visibleVal);
+                                 }
+                                 else {
+                                     assert !pessimistic() : "Pessimistic transaction should not have to redo gets: " +
+                                         this;
+ 
+                                     if (log.isDebugEnabled())
+                                         log.debug("Failed to set versioned value for entry (will redo): " + e);
+ 
+                                     redos.add(key);
+                                 }
+                             }
+ 
+                             loaded.add(key);
+ 
+                             if (log.isDebugEnabled())
+                                 log.debug("Set value loaded from store into entry from transaction [set=" + set +
+                                     ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+ 
+                             break; // While loop.
+                         }
+                         catch (IgniteCheckedException ex) {
+                             throw new IgniteException("Failed to put value for cache entry: " + e, ex);
+                         }
+                     }
+                 }
+             }),
+             new C2<Boolean, Exception, Map<K, V>>() {
+                 @Override public Map<K, V> apply(Boolean b, Exception e) {
+                     if (e != null) {
+                         setRollbackOnly();
+ 
+                         throw new GridClosureException(e);
+                     }
+ 
+                     if (!b && !readCommitted()) {
+                         // There is no store - we must mark the entries.
+                         for (K key : missedMap.keySet()) {
+                             IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
+ 
+                             if (txEntry != null)
+                                 txEntry.markValid();
+                         }
+                     }
+ 
+                     if (readCommitted()) {
+                         Collection<K> notFound = new HashSet<>(missedMap.keySet());
+ 
+                         notFound.removeAll(loaded);
+ 
+                         // In read-committed mode touch entries that have just been read.
+                         for (K key : notFound) {
+                             IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
+ 
+                             GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
+                                 txEntry.cached();
+ 
+                             if (entry != null)
+                                 cacheCtx.evicts().touch(entry, topologyVersion());
+                         }
+                     }
+ 
+                     return map;
+                 }
+             });
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Map<K, V>> getAllAsync(
+         final GridCacheContext<K, V> cacheCtx,
+         Collection<? extends K> keys,
+         @Nullable GridCacheEntryEx<K, V> cached, final boolean deserializePortable,
+         final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+         if (F.isEmpty(keys))
+             return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
+ 
+         init();
+ 
+         int keysCnt = keys.size();
+ 
+         boolean single = keysCnt == 1;
+ 
+         try {
+             checkValid();
+ 
+             final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
+ 
+             final Map<K, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
+ 
+             final Collection<K> lockKeys = enlistRead(cacheCtx, keys, cached, retMap, missed, keysCnt,
+                 deserializePortable, filter);
+ 
+             if (single && missed.isEmpty())
+                 return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+ 
+             // Handle locks.
+             if (pessimistic() && !readCommitted() && !groupLock()) {
+                 IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), this, true, true,
+                     isolation, isInvalidate(), CU.<K, V>empty());
+ 
+                 PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
+                     @Override public IgniteFuture<Map<K, V>> postLock() throws IgniteCheckedException {
+                         if (log.isDebugEnabled())
+                             log.debug("Acquired transaction lock for read on keys: " + lockKeys);
+ 
+                         // Load keys only after the locks have been acquired.
+                         for (K key : lockKeys) {
+                             if (retMap.containsKey(key))
+                                 // We already have a return value.
+                                 continue;
+ 
+                             IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+ 
+                             IgniteTxEntry<K, V> txEntry = entry(txKey);
+ 
+                             assert txEntry != null;
+ 
+                             // Check if there is cached value.
+                             while (true) {
+                                 GridCacheEntryEx<K, V> cached = txEntry.cached();
+ 
+                                 try {
+                                     Object transformClo =
+                                         (!F.isEmpty(txEntry.transformClosures()) &&
+                                             cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+                                             F.first(txEntry.transformClosures()) : null;
+ 
+                                     V val = cached.innerGet(IgniteTxLocalAdapter.this,
+                                         cacheCtx.isSwapOrOffheapEnabled(),
+                                         /*read-through*/false,
+                                         /*fail-fast*/true,
+                                         /*unmarshal*/true,
+                                         /*metrics*/true,
+                                         /*events*/true,
+                                         /*temporary*/true,
+                                         CU.subjectId(IgniteTxLocalAdapter.this, cctx),
+                                         transformClo,
+                                         resolveTaskName(),
 -                                        filter);
++                                        filter,
++                                        null);
+ 
+                                     // If value is in cache and passed the filter.
+                                     if (val != null) {
+                                         missed.remove(key);
+ 
+                                         txEntry.setAndMarkValid(val);
+ 
+                                         if (!F.isEmpty(txEntry.transformClosures())) {
+                                             for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+                                                 val = clos.apply(val);
+                                         }
+ 
+                                         if (cacheCtx.portableEnabled())
+                                             val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+ 
+                                         retMap.put(key, val);
+                                     }
+ 
+                                     // Even though we bring the value back from lock acquisition,
+                                     // we still need to recheck primary node for consistent values
+                                     // in case of concurrent transactional locks.
+ 
+                                     break; // While.
+                                 }
+                                 catch (GridCacheEntryRemovedException ignore) {
+                                     if (log.isDebugEnabled())
+                                         log.debug("Got removed exception in get postLock (will retry): " +
+                                             cached);
+ 
+                                     txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
+                                 }
+                                 catch (GridCacheFilterFailedException e) {
+                                     // Failed value for the filter.
+                                     V val = e.value();
+ 
+                                     if (val != null) {
+                                         // If filter fails after lock is acquired, we don't reload,
+                                         // regardless if value is null or not.
+                                         missed.remove(key);
+ 
+                                         txEntry.setAndMarkValid(val);
+                                     }
+ 
+                                     break; // While.
+                                 }
+                             }
+                         }
+ 
+                         if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal()))
+                             return checkMissed(cacheCtx, retMap, missed, null, deserializePortable, filter);
+ 
+                         return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
+                     }
+                 };
+ 
+                 FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
+                     @Override Map<K, V> finish(Map<K, V> loaded) {
+                         retMap.putAll(loaded);
+ 
+                         return retMap;
+                     }
+                 };
+ 
+                 if (fut.isDone()) {
+                     try {
+                         IgniteFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
+ 
+                         return fut1.isDone() ?
+                             new GridFinishedFutureEx<>(finClos.apply(fut1.get(), null)) :
+                             new GridEmbeddedFuture<>(cctx.kernalContext(), fut1, finClos);
+                     }
+                     catch (GridClosureException e) {
+                         return new GridFinishedFuture<>(cctx.kernalContext(), e.unwrap());
+                     }
+                     catch (IgniteCheckedException e) {
+                         try {
+                             return plc2.apply(false, e);
+                         }
+                         catch (Exception e1) {
+                             return new GridFinishedFuture<>(cctx.kernalContext(), e1);
+                         }
+                     }
+                 }
+                 else {
+                     return new GridEmbeddedFuture<>(
+                         cctx.kernalContext(),
+                         fut,
+                         plc2,
+                         finClos);
+                 }
+             }
+             else {
+                 assert optimistic() || readCommitted() || groupLock();
+ 
+                 final Collection<K> redos = new LinkedList<>();
+ 
+                 if (!missed.isEmpty()) {
+                     if (!readCommitted())
+                         for (Iterator<K> it = missed.keySet().iterator(); it.hasNext(); )
+                             if (retMap.containsKey(it.next()))
+                                 it.remove();
+ 
+                     if (missed.isEmpty())
+                         return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+ 
+                     return new GridEmbeddedFuture<>(
+                         cctx.kernalContext(),
+                         // First future.
+                         checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, filter),
+                         // Closure that returns another future, based on result from first.
+                         new PMC<Map<K, V>>() {
+                             @Override public IgniteFuture<Map<K, V>> postMiss(Map<K, V> map) {
+                                 if (redos.isEmpty())
+                                     return new GridFinishedFuture<>(cctx.kernalContext(),
+                                         Collections.<K, V>emptyMap());
+ 
+                                 if (log.isDebugEnabled())
+                                     log.debug("Starting to future-recursively get values for keys: " + redos);
+ 
+                                 // Future recursion.
+                                 return getAllAsync(cacheCtx, redos, null, deserializePortable, filter);
+                             }
+                         },
+                         // Finalize.
+                         new FinishClosure<Map<K, V>>() {
+                             @Override Map<K, V> finish(Map<K, V> loaded) {
+                                 for (Map.Entry<K, V> entry : loaded.entrySet()) {
+                                     IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(entry.getKey()));
+ 
+                                     V val = entry.getValue();
+ 
+                                     if (!readCommitted())
+                                         txEntry.readValue(val);
+ 
+                                     if (!F.isEmpty(txEntry.transformClosures())) {
+                                         for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+                                             val = clos.apply(val);
+                                     }
+ 
+                                     retMap.put(entry.getKey(), val);
+                                 }
+ 
+                                 return retMap;
+                             }
+                         }
+                     );
+                 }
+ 
+                 return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+             }
+         }
+         catch (IgniteCheckedException e) {
+             setRollbackOnly();
+ 
+             return new GridFinishedFuture<>(cctx.kernalContext(), e);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<GridCacheReturn<V>> putAllAsync(
+         GridCacheContext<K, V> cacheCtx,
+         Map<? extends K, ? extends V> map,
+         boolean retval,
+         @Nullable GridCacheEntryEx<K, V> cached,
+         long ttl,
+         IgnitePredicate<GridCacheEntry<K, V>>[] filter
+     ) {
+         return putAllAsync0(cacheCtx, map, null, null, retval, cached, ttl, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> putAllDrAsync(
+         GridCacheContext<K, V> cacheCtx,
+         Map<? extends K, GridCacheDrInfo<V>> drMap
+     ) {
+         return putAllAsync0(cacheCtx, null, null, drMap, false, null, -1, null);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<GridCacheReturn<V>> transformAllAsync(
+         GridCacheContext<K, V> cacheCtx,
+         @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> map,
+         boolean retval,
+         @Nullable GridCacheEntryEx<K, V> cached,
+         long ttl
+     ) {
+         return putAllAsync0(cacheCtx, null, map, null, retval, null, -1, null);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> removeAllDrAsync(
+         GridCacheContext<K, V> cacheCtx,
+         Map<? extends K, GridCacheVersion> drMap
+     ) {
+         return removeAllAsync0(cacheCtx, null, drMap, null, false, null);
+     }
+ 
+     /**
+      * Checks filter for non-pessimistic transactions.
+      *
+      * @param cached Cached entry.
+      * @param filter Filter to check.
+      * @return {@code True} if passed or pessimistic.
+      * @throws IgniteCheckedException If failed.
+      */
+     private boolean filter(GridCacheEntryEx<K, V> cached,
+         IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+         return pessimistic() || cached.context().isAll(cached, filter);
+     }
+ 
+     /**
+      * Internal routine for <tt>putAll(..)</tt>
+      *
++     * @param cacheCtx Cache context.
+      * @param keys Keys to enlist.
+      * @param cached Cached entry.
 -     * @param ttl Time to live for entry. If negative, leave unchanged.
++     * @param expiryPlc Explicitly specified expiry policy for entry.
+      * @param implicit Implicit flag.
+      * @param lookup Value lookup map ({@code null} for remove).
+      * @param transformMap Map with transform closures if this is a transform operation.
+      * @param retval Flag indicating whether a value should be returned.
+      * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+      * @param filter User filters.
+      * @param ret Return value.
+      * @param enlisted Collection of keys enlisted into this transaction.
+      * @param drPutMap DR put map (optional).
+      * @param drRmvMap DR remove map (optional).
+      * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
+      */
+     protected IgniteFuture<Set<K>> enlistWrite(
+         GridCacheContext<K, V> cacheCtx,
+         Collection<? extends K> keys,
+         @Nullable GridCacheEntryEx<K, V> cached,
 -        long ttl,
++        @Nullable ExpiryPolicy expiryPlc,
+         boolean implicit,
+         @Nullable Map<? extends K, ? extends V> lookup,
+         @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+         boolean retval,
+         boolean lockOnly,
+         IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+         final GridCacheReturn<V> ret,
+         Collection<K> enlisted,
+         @Nullable Map<? extends K, GridCacheDrInfo<V>> drPutMap,
+         @Nullable Map<? extends K, GridCacheVersion> drRmvMap
+     ) {
+         assert cached == null || keys.size() == 1;
+         assert cached == null || F.first(keys).equals(cached.key());
+ 
+         try {
+             addActiveCache(cacheCtx);
+         }
+         catch (IgniteCheckedException e) {
+             return new GridFinishedFuture<>(cctx.kernalContext(), e);
+         }
+ 
+         Set<K> skipped = null;
+ 
+         boolean rmv = lookup == null && transformMap == null;
+ 
+         try {
+             // Set transform flag for transaction.
+             if (transformMap != null)
+                 transform = true;
+ 
+             groupLockSanityCheck(cacheCtx, keys);
+ 
+             for (K key : keys) {
+                 V val = rmv || lookup == null ? null : lookup.get(key);
+                 IgniteClosure<V, V> transformClo = transformMap == null ? null : transformMap.get(key);
+ 
+                 GridCacheVersion drVer;
+                 long drTtl;
+                 long drExpireTime;
+ 
+                 if (drPutMap != null) {
+                     GridCacheDrInfo<V> info = drPutMap.get(key);
+ 
+                     assert info != null;
+ 
+                     drVer = info.version();
+                     drTtl = info.ttl();
+                     drExpireTime = info.expireTime();
+                 }
+                 else if (drRmvMap != null) {
+                     assert drRmvMap.get(key) != null;
+ 
+                     drVer = drRmvMap.get(key);
+                     drTtl = -1L;
+                     drExpireTime = -1L;
+                 }
+                 else {
+                     drVer = null;
+                     drTtl = -1L;
+                     drExpireTime = -1L;
+                 }
+ 
+                 if (key == null)
+                     continue;
+ 
+                 if (!rmv && val == null && transformClo == null) {
+                     skipped = skip(skipped, key);
+ 
+                     continue;
+                 }
+ 
+                 if (cacheCtx.portableEnabled())
+                     key = (K)cacheCtx.marshalToPortable(key);
+ 
+                 IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+ 
+                 IgniteTxEntry<K, V> txEntry = entry(txKey);
+ 
+                 // First time access.
+                 if (txEntry == null) {
+                     while (true) {
+                         GridCacheEntryEx<K, V> entry;
+ 
+                         if (cached != null) {
+                             entry = cached;
+ 
+                             cached = null;
+                         }
+                         else {
+                             entry = entryEx(cacheCtx, txKey, topologyVersion());
+ 
+                             entry.unswap(true, false);
+                         }
+ 
+                         try {
+                             // Check if lock is being explicitly acquired by the same thread.
+                             if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+                                 entry.lockedByThread(threadId, xidVer))
+                                 throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+                                     "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer +
+                                     ", threadId=" + threadId +
+                                     ", locNodeId=" + cctx.localNodeId() + ']');
+ 
+                             V old = null;
+ 
+                             boolean readThrough = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ 
+                             if (optimistic()) {
+                                 try {
 -                                    //Should read through if filter is specified.
++                                    // Should read through if filter is specified.
+                                     old = entry.innerGet(this,
+                                         /*swap*/false,
+                                         /*read-through*/readThrough,
+                                         /*fail-fast*/false,
+                                         /*unmarshal*/retval,
+                                         /*metrics*/retval,
+                                         /*events*/retval,
+                                         /*temporary*/false,
+                                         CU.subjectId(this, cctx),
+                                         transformClo,
+                                         resolveTaskName(),
 -                                        CU.<K, V>empty());
++                                        CU.<K, V>empty(),
++                                        null);
+                                 }
+                                 catch (GridCacheFilterFailedException e) {
+                                     e.printStackTrace();
+ 
+                                     assert false : "Empty filter failed: " + e;
+                                 }
+                             }
+                             else
+                                 old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
+ 
+                             if (!filter(entry, filter)) {
+                                 skipped = skip(skipped, key);
+ 
+                                 ret.set(old, false);
+ 
+                                 if (!readCommitted() && old != null) {
+                                     // Enlist failed filters as reads for non-read-committed mode,
+                                     // so future ops will get the same values.
 -                                    txEntry = addEntry(READ, old, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L,
++                                    txEntry = addEntry(READ, old, null, entry, null, CU.<K, V>empty(), false, -1L, -1L,
+                                         null);
+ 
+                                     txEntry.markValid();
+                                 }
+ 
+                                 if (readCommitted() || old == null)
+                                     cacheCtx.evicts().touch(entry, topologyVersion());
+ 
+                                 break; // While.
+                             }
+ 
+                             txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM :
 -                                old != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
++                                old != null ? UPDATE : CREATE, val, transformClo, entry, expiryPlc, filter, true, drTtl,
+                                 drExpireTime, drVer);
+ 
+                             if (!implicit() && readCommitted())
+                                 cacheCtx.evicts().touch(entry, topologyVersion());
+ 
+                             if (groupLock() && !lockOnly)
+                                 txEntry.groupLockEntry(true);
+ 
+                             enlisted.add(key);
+ 
+                             if (!pessimistic() || (groupLock() && !lockOnly)) {
+                                 txEntry.markValid();
+ 
+                                 if (old == null) {
+                                     if (retval && !readThrough) {
+                                         // If return value is required, then we know for sure that there is only
+                                         // one key in the keys collection.
+                                         assert keys.size() == 1;
+ 
+                                         IgniteFuture<Boolean> fut = loadMissing(
+                                             cacheCtx,
+                                             true,
+                                             F.asList(key),
+                                             deserializePortables(cacheCtx),
+                                             new CI2<K, V>() {
+                                                 @Override public void apply(K k, V v) {
+                                                     if (log.isDebugEnabled())
+                                                         log.debug("Loaded value from remote node [key=" + k + ", val=" +
+                                                             v + ']');
+ 
+                                                     ret.set(v, true);
+                                                 }
+                                             });
+ 
+                                         return new GridEmbeddedFuture<>(
+                                             cctx.kernalContext(),
+                                             fut,
+                                             new C2<Boolean, Exception, Set<K>>() {
+                                                 @Override public Set<K> apply(Boolean b, Exception e) {
+                                                     if (e != null)
+                                                         throw new GridClosureException(e);
+ 
+                                                     return Collections.emptySet();
+                                                 }
+                                             }
+                                         );
+                                     }
+                                     else
+                                         ret.set(null, true);
+                                 }
+                                 else
+                                     ret.set(old, true);
+                             }
+                             // Pessimistic.
+                             else
+                                 ret.set(old, true);
+ 
+                             break; // While.
+                         }
+                         catch (GridCacheEntryRemovedException ignore) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Got removed entry in transaction putAll0 method: " + entry);
+                         }
+                     }
+                 }
+                 else {
+                     if (transformClo == null && txEntry.op() == TRANSFORM)
+                         throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
+                             "transaction after transform closure is applied): " + key);
+ 
+                     GridCacheEntryEx<K, V> entry = txEntry.cached();
+ 
+                     V v = txEntry.value();
+ 
+                     boolean del = txEntry.op() == DELETE && rmv;
+ 
+                     if (!del) {
+                         if (!filter(entry, filter)) {
+                             skipped = skip(skipped, key);
+ 
+                             ret.set(v, false);
+ 
+                             continue;
+                         }
+ 
+                         txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM :
 -                            v != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
++                            v != null ? UPDATE : CREATE, val, transformClo, entry, expiryPlc, filter, true, drTtl,
+                             drExpireTime, drVer);
+ 
+                         enlisted.add(key);
+                     }
+ 
+                     if (!pessimistic()) {
+                         txEntry.markValid();
+ 
+                         // Set tx entry and return values.
+                         ret.set(v, true);
+                     }
+                 }
+             }
+         }
+         catch (IgniteCheckedException e) {
+             return new GridFinishedFuture<>(cctx.kernalContext(), e);
+         }
+ 
+         return new GridFinishedFuture<>(cctx.kernalContext(), skipped);
+     }
+ 
+     /**
+      * Post lock processing for put or remove.
+      *
+      * @param keys Keys.
+      * @param failed Collection of potentially failed keys (need to populate in this method).
+      * @param transformed Output map where transformed values will be placed.
+      * @param transformMap Transform map.
+      * @param ret Return value.
+      * @param rmv {@code True} if remove.
+      * @param retval Flag to return value or not.
+      * @param filter Filter to check entries.
+      * @return Failed keys.
+      * @throws IgniteCheckedException If error.
+      */
+  

<TRUNCATED>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f537940c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------