You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/23 09:23:09 UTC

[33/38] incubator-ignite git commit: # ignite-41

# ignite-41


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5b142ddd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5b142ddd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5b142ddd

Branch: refs/heads/ignite-1
Commit: 5b142ddd2d54172510035a7a6a72adad7e133b40
Parents: 575dbae
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 22 16:50:58 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 22 23:00:55 2014 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  26 +++--
 .../cache/IgniteCacheExpiryPolicy.java          |   7 +-
 .../GridDistributedCacheAdapter.java            |  18 ++-
 .../GridDistributedTxRemoteAdapter.java         |  12 +-
 .../distributed/dht/GridDhtLockFuture.java      |  18 ++-
 .../distributed/dht/GridDhtLockRequest.java     |  50 +++++++-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  42 +++++--
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  16 ++-
 .../cache/distributed/dht/GridDhtTxRemote.java  |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  19 ++-
 .../dht/colocated/GridDhtColocatedCache.java    |  87 +++++++++++---
 .../colocated/GridDhtColocatedLockFuture.java   |  23 +++-
 .../distributed/near/GridNearAtomicCache.java   |   1 +
 .../distributed/near/GridNearCacheAdapter.java  |   3 +-
 .../distributed/near/GridNearLockFuture.java    |  11 +-
 .../distributed/near/GridNearLockRequest.java   |  35 +++++-
 .../near/GridNearTransactionalCache.java        |  28 +++--
 .../cache/distributed/near/GridNearTxLocal.java | 116 +++++++++++++++++--
 .../processors/cache/local/GridLocalCache.java  |  11 +-
 .../processors/cache/local/GridLocalTx.java     |   1 +
 .../local/atomic/GridLocalAtomicCache.java      |   4 +-
 .../cache/transactions/IgniteTxHandler.java     |  11 +-
 .../transactions/IgniteTxLocalAdapter.java      |  82 ++++++++++---
 .../IgniteCacheExpiryPolicyAbstractTest.java    |  43 +++++--
 .../GridCacheInterceptorAbstractSelfTest.java   |   1 +
 25 files changed, 545 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 3780cbb..5295f20 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -533,6 +533,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
      * @param retval Flag to return value.
      * @param isolation Transaction isolation.
      * @param invalidate Invalidate flag.
+     * @param filter TTL for read operation.
      * @param filter Optional filter.
      * @return Locks future.
      */
@@ -544,6 +545,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
         boolean retval,
         IgniteTxIsolation isolation,
         boolean invalidate,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter);
 
     /**
@@ -1743,11 +1745,6 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
 
         subjId = ctx.subjectIdPerCall(subjId, prj);
 
-        ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
-
-        if (expiryPlc == null)
-            expiryPlc = ctx.expiry();
-
         return getAllAsync(keys,
             entry,
             !skipTx,
@@ -1755,7 +1752,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
             taskName,
             deserializePortable,
             forcePrimary,
-            GetExpiryPolicy.forPolicy(expiryPlc),
+            accessExpiryPolicy(prj != null ? prj.expiry() : null),
             filter);
     }
 
@@ -4552,6 +4549,17 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
     }
 
     /**
+     * @param plc Explicitly specified expiry policy for cache operation.
+     * @return Expiry policy wrapper.
+     */
+    @Nullable public GetExpiryPolicy accessExpiryPolicy(@Nullable ExpiryPolicy plc) {
+        if (plc == null)
+            plc = ctx.expiry();
+
+        return GetExpiryPolicy.forPolicy(plc);
+    }
+
+    /**
      * Cache operation.
      */
     private abstract class SyncOp<T> {
@@ -4901,10 +4909,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
             return -1L;
         }
 
-        /**
-         *
-         */
-        public synchronized void reset() {
+        /** {@inheritDoc} */
+        @Override public synchronized void reset() {
             if (entries != null)
                 entries.clear();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java
index 59cd937..d603b35 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java
@@ -36,7 +36,7 @@ public interface IgniteCacheExpiryPolicy {
     public long forAccess();
 
     /**
-     * Callback when entry's ttl is updated on access.
+     * Callback for ttl update on entry access.
      *
      * @param key Entry key.
      * @param keyBytes Entry key bytes.
@@ -49,6 +49,11 @@ public interface IgniteCacheExpiryPolicy {
        @Nullable Collection<UUID> rdrs);
 
     /**
+     * Clears information about updated entries.
+     */
+    public void reset();
+
+    /**
      * @return Entries with TTL updated on access.
      */
     @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java
index e19fe92..2d01a82 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -59,11 +59,12 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         boolean retval,
         IgniteTxIsolation isolation,
         boolean isInvalidate,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter
     ) {
         assert tx != null;
 
-        return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, filter);
+        return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, accessTtl, filter);
     }
 
     /** {@inheritDoc} */
@@ -72,7 +73,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         IgniteTxLocalEx<K, V> tx = ctx.tm().userTxx();
 
         // Return value flag is true because we choose to bring values for explicit locks.
-        return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, filter);
+        return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, -1L, filter);
     }
 
     /**
@@ -83,12 +84,19 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
      * @param isRead Indicates whether value is read or written.
      * @param retval Flag to return value.
      * @param isolation Transaction isolation.
+     * @param accessTtl TTL for read operation.
      * @param filter Optional filter.
      * @return Future for locks.
      */
-    protected abstract IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout,
-        @Nullable IgniteTxLocalEx<K, V> tx, boolean isInvalidate, boolean isRead, boolean retval,
-        @Nullable IgniteTxIsolation isolation, IgnitePredicate<GridCacheEntry<K, V>>[] filter);
+    protected abstract IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+        long timeout,
+        @Nullable IgniteTxLocalEx<K, V> tx,
+        boolean isInvalidate,
+        boolean isRead,
+        boolean retval,
+        @Nullable IgniteTxIsolation isolation,
+        long accessTtl,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter);
 
     /**
      * @param key Key to remove.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 1abf714..2e87441 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -578,17 +578,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
                                         }
                                     }
                                     else if (op == READ) {
-                                        ExpiryPolicy expiry = txEntry.expiry();
-
-                                        if (expiry == null)
-                                            expiry = cacheCtx.expiry();
-
-                                        if (expiry != null) {
-                                            Duration duration = expiry.getExpiryForAccess();
-
-                                            if (duration != null)
-                                                cached.updateTtl(null, CU.toTtl(duration));
-                                        }
+                                        assert near();
 
                                         if (log.isDebugEnabled())
                                             log.debug("Ignoring READ entry when committing: " + txEntry);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 5a9ee72..2970568 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -121,6 +121,9 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
     /** Pending locks. */
     private final Collection<K> pendingLocks = new GridConcurrentHashSet<>();
 
+    /** TTL for read operation. */
+    private long accessTtl;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -138,6 +141,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
      * @param timeout Lock acquisition timeout.
      * @param tx Transaction.
      * @param threadId Thread ID.
+     * @param accessTtl TTL for read operation.
      * @param filter Filter.
      */
     public GridDhtLockFuture(
@@ -150,10 +154,10 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
         long timeout,
         GridDhtTxLocalAdapter<K, V> tx,
         long threadId,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         super(cctx.kernalContext(), CU.boolReducer());
 
-        assert cctx != null;
         assert nearNodeId != null;
         assert nearLockVer != null;
         assert topVer > 0;
@@ -166,6 +170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
         this.timeout = timeout;
         this.filter = filter;
         this.tx = tx;
+        this.accessTtl = accessTtl;
 
         if (tx != null)
             tx.topologyVersion(topVer);
@@ -202,6 +207,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @param invalidPart Partition to retry.
      */
     void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) {
@@ -827,7 +833,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
                         inTx() ? tx.groupLockKey() : null,
                         inTx() && tx.partitionLock(),
                         inTx() ? tx.subjectId() : null,
-                        inTx() ? tx.taskNameHash() : 0);
+                        inTx() ? tx.taskNameHash() : 0,
+                        read ? accessTtl : -1L);
 
                     try {
                         for (ListIterator<GridDhtCacheEntry<K, V>> it = dhtMapping.listIterator(); it.hasNext();) {
@@ -838,11 +845,13 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
 
                             boolean invalidateRdr = e.readerId(n.id()) != null;
 
+                            IgniteTxEntry<K, V> entry = tx != null ? tx.entry(e.txKey()) : null;
+
                             req.addDhtKey(
                                 e.key(),
                                 e.getOrMarshalKeyBytes(),
                                 tx != null ? tx.writeMap().get(e.txKey()) : null,
-                                tx != null ? tx.entry(e.txKey()).drVersion() : null,
+                                entry != null ? entry.drVersion() : null,
                                 invalidateRdr,
                                 cctx);
 
@@ -906,7 +915,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
                         inTx() ? tx.groupLockKey() : null,
                         inTx() && tx.partitionLock(),
                         inTx() ? tx.subjectId() : null,
-                        inTx() ? tx.taskNameHash() : 0);
+                        inTx() ? tx.taskNameHash() : 0,
+                        read ? accessTtl : -1L);
 
                     try {
                         for (ListIterator<GridDhtCacheEntry<K, V>> it = nearMapping.listIterator(); it.hasNext();) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java
index cb4ef69..f72f921 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -73,6 +73,9 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
     @GridDirectVersion(3)
     private BitSet preloadKeys;
 
+    /** TTL for read operation. */
+    private long accessTtl;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -81,6 +84,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
     }
 
     /**
+     * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param nearXidVer Near transaction ID.
      * @param threadId Thread ID.
@@ -98,6 +102,9 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
      * @param txSize Expected transaction size.
      * @param grpLockKey Group lock key.
      * @param partLock {@code True} if partition lock.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param accessTtl TTL for read operation.
      */
     public GridDhtLockRequest(
         int cacheId,
@@ -119,10 +126,24 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
         @Nullable IgniteTxKey grpLockKey,
         boolean partLock,
         @Nullable UUID subjId,
-        int taskNameHash
+        int taskNameHash,
+        long accessTtl
     ) {
-        super(cacheId, nodeId, nearXidVer, threadId, futId, lockVer, isInTx, isRead, isolation, isInvalidate, timeout,
-            dhtCnt == 0 ? nearCnt : dhtCnt, txSize, grpLockKey, partLock);
+        super(cacheId,
+            nodeId,
+            nearXidVer,
+            threadId,
+            futId,
+            lockVer,
+            isInTx,
+            isRead,
+            isolation,
+            isInvalidate,
+            timeout,
+            dhtCnt == 0 ? nearCnt : dhtCnt,
+            txSize,
+            grpLockKey,
+            partLock);
 
         this.topVer = topVer;
 
@@ -135,6 +156,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
         this.miniId = miniId;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+        this.accessTtl = accessTtl;
     }
 
     /** {@inheritDoc} */
@@ -239,6 +261,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
     }
 
     /**
+     * @param idx Key index.
      * @return {@code True} if need to preload key with given index.
      */
     public boolean needPreloadKey(int idx) {
@@ -282,6 +305,13 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
         return miniId;
     }
 
+    /**
+     * @return TTL for read operation.
+     */
+    public long accessTtl() {
+        return accessTtl;
+    }
+
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
@@ -330,6 +360,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
         _clone.subjId = subjId;
         _clone.taskNameHash = taskNameHash;
         _clone.preloadKeys = preloadKeys;
+        _clone.accessTtl = accessTtl;
     }
 
     /** {@inheritDoc} */
@@ -417,6 +448,12 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
 
                 commState.idx++;
 
+            case 32:
+                if (!commState.putLong(accessTtl))
+                    return false;
+
+                commState.idx++;
+
         }
 
         return true;
@@ -526,6 +563,13 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
 
                 commState.idx++;
 
+            case 32:
+                if (buf.remaining() < 8)
+                    return false;
+
+                accessTtl = commState.getLong();
+
+                commState.idx++;
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 67c01b3..dc8ddcb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -210,13 +210,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
                             tx.addWrite(
                                 ctx,
-                                writeEntry == null ? (req.txRead() ? READ : NOOP) : writeEntry.op(),
+                                writeEntry == null ? NOOP : writeEntry.op(),
                                 txKey,
                                 req.keyBytes() != null ? req.keyBytes().get(i) : null,
                                 writeEntry == null ? null : writeEntry.value(),
                                 writeEntry == null ? null : writeEntry.valueBytes(),
                                 writeEntry == null ? null : writeEntry.transformClosures(),
-                                drVer);
+                                drVer,
+                                req.accessTtl());
 
                             if (req.groupLock())
                                 tx.groupLockKey(txKey);
@@ -547,8 +548,17 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         boolean isRead,
         boolean retval,
         IgniteTxIsolation isolation,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
-        return lockAllAsyncInternal(keys, timeout, txx, isInvalidate, isRead, retval, isolation, filter);
+        return lockAllAsyncInternal(keys,
+            timeout,
+            txx,
+            isInvalidate,
+            isRead,
+            retval,
+            isolation,
+            accessTtl,
+            filter);
     }
 
     /**
@@ -561,6 +571,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @param isRead Read flag.
      * @param retval Return value flag.
      * @param isolation Transaction isolation.
+     * @param accessTtl TTL for read operation.
      * @param filter Optional filter.
      * @return Lock future.
      */
@@ -571,6 +582,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         boolean isRead,
         boolean retval,
         IgniteTxIsolation isolation,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         if (keys == null || keys.isEmpty())
             return new GridDhtFinishedFuture<>(ctx.kernalContext(), true);
@@ -589,6 +601,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             timeout,
             tx,
             tx.threadId(),
+            accessTtl,
             filter);
 
         for (K key : keys) {
@@ -637,6 +650,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @param nearNode Near node.
      * @param req Request.
      * @param filter0 Filter.
@@ -705,6 +719,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                 req.timeout(),
                                 tx,
                                 req.threadId(),
+                                req.accessTtl(),
                                 filter);
 
                             // Add before mapping.
@@ -815,15 +830,16 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                 req.drVersions(),
                                 req.messageId(),
                                 req.implicitTx(),
-                                req.txRead());
+                                req.txRead(),
+                                req.accessTtl());
 
                             final GridDhtTxLocal<K, V> t = tx;
 
                             return new GridDhtEmbeddedFuture<>(
                                 txFut,
                                 new C2<GridCacheReturn<V>, Exception, IgniteFuture<GridNearLockResponse<K, V>>>() {
-                                    @Override public IgniteFuture<GridNearLockResponse<K, V>> apply(GridCacheReturn<V> o,
-                                                                                                  Exception e) {
+                                    @Override public IgniteFuture<GridNearLockResponse<K, V>> apply(
+                                        GridCacheReturn<V> o, Exception e) {
                                         if (e != null)
                                             e = U.unwrap(e);
 
@@ -831,7 +847,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
                                         // Create response while holding locks.
                                         final GridNearLockResponse<K, V> resp = createLockReply(nearNode,
-                                            entries, req, t, t.xidVersion(), e);
+                                            entries,
+                                            req,
+                                            t,
+                                            t.xidVersion(),
+                                            e);
 
                                         if (resp.error() == null && t.onePhaseCommit()) {
                                             assert t.implicit();
@@ -880,8 +900,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                         else if (!b)
                                             e = new GridCacheLockTimeoutException(req.version());
 
-                                        GridNearLockResponse<K, V> res = createLockReply(nearNode, entries, req,
-                                            null, mappedVer, e);
+                                        GridNearLockResponse<K, V> res = createLockReply(nearNode,
+                                            entries,
+                                            req,
+                                            null,
+                                            mappedVer,
+                                            e);
 
                                         sendLockReply(nearNode, null, req, res);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index d20a4a3..b752178 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -461,12 +461,15 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @param entries Entries to lock.
      * @param writeEntries Write entries for implicit transactions mapped to one node.
+     * @param onePhaseCommit One phase commit flag.
      * @param drVers DR versions.
      * @param msgId Message ID.
      * @param implicit Implicit flag.
      * @param read Read flag.
+     * @param accessTtl TTL for read operation.
      * @return Lock future.
      */
     IgniteFuture<GridCacheReturn<V>> lockAllAsync(
@@ -477,7 +480,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
         GridCacheVersion[] drVers,
         long msgId,
         boolean implicit,
-        final boolean read
+        final boolean read,
+        long accessTtl
     ) {
         try {
             checkValid();
@@ -547,6 +551,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
                         txEntry.drExpireTime(w.drExpireTime());
                         txEntry.expiry(w.expiry());
                     }
+                    else if (read)
+                        txEntry.ttl(accessTtl);
 
                     txEntry.cached(cached, txEntry.keyBytes());
 
@@ -573,7 +579,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
             if (log.isDebugEnabled())
                 log.debug("Lock keys: " + passedKeys);
 
-            return obtainLockAsync(cacheCtx, ret, passedKeys, read, skipped, null);
+            return obtainLockAsync(cacheCtx, ret, passedKeys, read, skipped, accessTtl, null);
         }
         catch (IgniteCheckedException e) {
             setRollbackOnly();
@@ -588,6 +594,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
      * @param passedKeys Passed keys.
      * @param read {@code True} if read.
      * @param skipped Skipped keys.
+     * @param accessTtl TTL for read operation.
      * @param filter Entry write filter.
      * @return Future for lock acquisition.
      */
@@ -597,6 +604,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
         final Collection<? extends K> passedKeys,
         final boolean read,
         final Set<K> skipped,
+        final long accessTtl,
         @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         if (log.isDebugEnabled())
             log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" +
@@ -614,6 +622,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
             read,
             /*retval*/false,
             isolation,
+            accessTtl,
             CU.<K, V>empty());
 
         return new GridEmbeddedFuture<>(
@@ -631,7 +640,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
                         ret,
                         /*remove*/false,
                         /*retval*/false,
-                        /*read*/true,
+                        /*read*/read,
+                        accessTtl,
                         filter == null ? CU.<K, V>empty() : filter);
 
                     return ret;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
index a545bc4..97ec1af 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -281,6 +281,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
      * @param valBytes Value bytes.
      * @param drVer Data center replication version.
      * @param clos Transform closures.
+     * @param ttl TTL.
      */
     public void addWrite(GridCacheContext<K, V> cacheCtx,
         GridCacheOperation op,
@@ -289,7 +290,8 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
         @Nullable V val,
         @Nullable byte[] valBytes,
         @Nullable Collection<IgniteClosure<V, V>> clos,
-        @Nullable GridCacheVersion drVer) {
+        @Nullable GridCacheVersion drVer,
+        long ttl) {
         checkInternal(key);
 
         if (isSystemInvalidate())
@@ -301,7 +303,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
             this,
             op,
             val,
-            -1L,
+            ttl,
             -1L,
             cached,
             drVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 65099ef..0b5d638 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -576,6 +576,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean isRead,
         boolean retval,
         @Nullable IgniteTxIsolation isolation,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         return new FinishedLockFuture(new UnsupportedOperationException("Locks are not supported for " +
             "GridCacheAtomicityMode.ATOMIC mode (use GridCacheAtomicityMode.TRANSACTIONAL instead)"));
@@ -734,8 +735,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         long topVer = ctx.affinity().affinityTopologyVersion();
 
-        final GetExpiryPolicy expiry =
-            GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+        final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc);
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
         if (!reload && !forcePrimary) {
@@ -2741,9 +2741,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         /** {@inheritDoc} */
         @Override public void ttlUpdated(Object key,
-                                         byte[] keyBytes,
-                                         GridCacheVersion ver,
-                                         @Nullable Collection<UUID> rdrs) {
+            byte[] keyBytes,
+            GridCacheVersion ver,
+            @Nullable Collection<UUID> rdrs) {
             if (entries == null)
                 entries = new HashMap<>();
 
@@ -2767,6 +2767,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
 
         /** {@inheritDoc} */
+        @Override public void reset() {
+            if (entries != null)
+                entries.clear();
+
+            if (rdrsMap != null)
+                rdrsMap.clear();
+        }
+
+        /** {@inheritDoc} */
         @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
             return entries;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7534fd5..9878768 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -184,7 +184,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             taskName,
             deserializePortable,
             filter,
-            prj != null ? prj.expiry() : null);
+            accessExpiryPolicy(prj != null ? prj.expiry() : null));
     }
 
     /** {@inheritDoc} */
@@ -236,15 +236,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         String taskName,
         boolean deserializePortable,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable ExpiryPolicy expiryPlc) {
+        @Nullable IgniteCacheExpiryPolicy expiryPlc) {
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
 
         if (keyCheck)
             validateCacheKeys(keys);
 
-        final GetExpiryPolicy expiry =
-            GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+        if (expiryPlc == null)
+            expiryPlc = accessExpiryPolicy(ctx.expiry());
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
         if (!reload && !forcePrimary) {
@@ -276,7 +276,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                 null,
                                 taskName,
                                 filter,
-                                expiry);
+                                expiryPlc);
 
                             // Entry was not in memory or in swap, so we remove it from cache.
                             if (v == null) {
@@ -325,14 +325,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             }
 
             if (success) {
-                sendTtlUpdateRequest(expiry);
+                sendTtlUpdateRequest(expiryPlc);
 
                 return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
             }
         }
 
-        if (expiry != null)
-            expiry.reset();
+        if (expiryPlc != null)
+            expiryPlc.reset();
 
         // Either reload or not all values are available locally.
         GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
@@ -344,7 +344,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             subjId,
             taskName,
             deserializePortable,
-            expiry);
+            expiryPlc);
 
         fut.init();
 
@@ -363,13 +363,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         boolean isRead,
         boolean retval,
         @Nullable IgniteTxIsolation isolation,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         assert tx == null || tx instanceof GridNearTxLocal;
 
         GridNearTxLocal<K, V> txx = (GridNearTxLocal<K, V>)tx;
 
-        GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx, keys, txx, isRead, retval,
-            timeout, filter);
+        GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx,
+            keys,
+            txx,
+            isRead,
+            retval,
+            timeout,
+            accessTtl,
+            filter);
 
         // Future will be added to mvcc only if it was mapped to remote nodes.
         fut.map();
@@ -570,6 +577,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @param tx Started colocated transaction (if any).
      * @param threadId Thread ID.
      * @param ver Lock version.
@@ -577,6 +585,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param keys Mapped keys.
      * @param txRead Tx read.
      * @param timeout Lock timeout.
+     * @param accessTtl TTL for read operation.
      * @param filter filter Optional filter.
      * @return Lock future.
      */
@@ -589,6 +598,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         final Collection<K> keys,
         final boolean txRead,
         final long timeout,
+        final long accessTtl,
         @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
     ) {
         assert keys != null;
@@ -601,7 +611,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                 // Check for exception.
                 keyFut.get();
 
-                return lockAllAsync0(cacheCtx, tx, threadId, ver, topVer, keys, txRead, timeout, filter);
+                return lockAllAsync0(cacheCtx,
+                    tx,
+                    threadId,
+                    ver,
+                    topVer,
+                    keys,
+                    txRead,
+                    timeout,
+                    accessTtl,
+                    filter);
             }
             catch (IgniteCheckedException e) {
                 return new GridFinishedFuture<>(ctx.kernalContext(), e);
@@ -614,7 +633,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                         if (exx != null)
                             return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx);
 
-                        return lockAllAsync0(cacheCtx, tx, threadId, ver, topVer, keys, txRead, timeout, filter);
+                        return lockAllAsync0(cacheCtx,
+                            tx,
+                            threadId,
+                            ver,
+                            topVer,
+                            keys,
+                            txRead,
+                            timeout,
+                            accessTtl,
+                            filter);
                     }
                 },
                 ctx.kernalContext());
@@ -622,6 +650,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @param tx Started colocated transaction (if any).
      * @param threadId Thread ID.
      * @param ver Lock version.
@@ -629,19 +658,35 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param keys Mapped keys.
      * @param txRead Tx read.
      * @param timeout Lock timeout.
+     * @param accessTtl TTL for read operation.
      * @param filter filter Optional filter.
      * @return Lock future.
      */
     private IgniteFuture<Exception> lockAllAsync0(
         GridCacheContext<K, V> cacheCtx,
-        @Nullable final GridNearTxLocal<K, V> tx, long threadId,
-        final GridCacheVersion ver, final long topVer, final Collection<K> keys, final boolean txRead,
-        final long timeout, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        @Nullable final GridNearTxLocal<K, V> tx,
+        long threadId,
+        final GridCacheVersion ver,
+        final long topVer,
+        final Collection<K> keys,
+        final boolean txRead,
+        final long timeout,
+        final long accessTtl,
+        @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         int cnt = keys.size();
 
         if (tx == null) {
-            GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx, ctx.localNodeId(), ver, topVer, cnt, txRead,
-                timeout, tx, threadId, filter);
+            GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx,
+                ctx.localNodeId(),
+                ver,
+                topVer,
+                cnt,
+                txRead,
+                timeout,
+                tx,
+                threadId,
+                accessTtl,
+                filter);
 
             // Add before mapping.
             if (!ctx.mvcc().addFuture(fut))
@@ -704,7 +749,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             if (log.isDebugEnabled())
                 log.debug("Performing colocated lock [tx=" + tx + ", keys=" + keys + ']');
 
-            IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx, keys, tx.implicit(), txRead);
+            IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx,
+                keys,
+                tx.implicit(),
+                txRead,
+                accessTtl);
 
             return new GridDhtEmbeddedFuture<>(
                 ctx.kernalContext(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 1a8f8d1..8c8a8e5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -101,6 +101,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
     /** Trackable flag (here may be non-volatile). */
     private boolean trackable;
 
+    /** TTL for read operation. */
+    private long accessTtl;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -115,6 +118,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @param read Read flag.
      * @param retval Flag to return value or not.
      * @param timeout Lock acquisition timeout.
+     * @param accessTtl TTL for read operation.
      * @param filter Filter.
      */
     public GridDhtColocatedLockFuture(
@@ -124,9 +128,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         boolean read,
         boolean retval,
         long timeout,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         super(cctx.kernalContext(), CU.boolReducer());
-        assert cctx != null;
+
         assert keys != null;
 
         this.cctx = cctx;
@@ -135,6 +140,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         this.read = read;
         this.retval = retval;
         this.timeout = timeout;
+        this.accessTtl = accessTtl;
         this.filter = filter;
 
         threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
@@ -710,7 +716,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                                         inTx() ? tx.groupLockKey() : null,
                                         inTx() && tx.partitionLock(),
                                         inTx() ? tx.subjectId() : null,
-                                        inTx() ? tx.taskNameHash() : 0);
+                                        inTx() ? tx.taskNameHash() : 0,
+                                        read ? accessTtl : -1L);
 
                                     mapping.request(req);
                                 }
@@ -878,8 +885,16 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         if (log.isDebugEnabled())
             log.debug("Before locally locking keys : " + keys);
 
-        IgniteFuture<Exception> fut = cctx.colocated().lockAllAsync(cctx, tx, threadId, lockVer,
-            topVer, keys, read, timeout, filter);
+        IgniteFuture<Exception> fut = cctx.colocated().lockAllAsync(cctx,
+            tx,
+            threadId,
+            lockVer,
+            topVer,
+            keys,
+            read,
+            timeout,
+            accessTtl,
+            filter);
 
         // Add new future.
         add(new GridEmbeddedFuture<>(

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index fa19607..d7e32b3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -631,6 +631,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         boolean isRead,
         boolean retval,
         @Nullable IgniteTxIsolation isolation,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         return dht.lockAllAsync(keys, timeout, filter);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 83ac913..f3994bf 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -279,8 +279,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
 
         IgniteTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (IgniteTxLocalEx<K, V>)tx : null;
 
-        final GetExpiryPolicy expiry =
-            GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+        final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc);
 
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
             keys,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
index 2653fd0..f1f9a7f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -111,6 +111,9 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
     @GridToStringExclude
     private List<GridDistributedCacheEntry<K, V>> entries;
 
+    /** TTL for read operation. */
+    private long accessTtl;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -125,6 +128,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * @param read Read flag.
      * @param retval Flag to return value or not.
      * @param timeout Lock acquisition timeout.
+     * @param accessTtl TTL for read operation.
      * @param filter Filter.
      */
     public GridNearLockFuture(
@@ -134,9 +138,10 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
         boolean read,
         boolean retval,
         long timeout,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         super(cctx.kernalContext(), CU.boolReducer());
-        assert cctx != null;
+
         assert keys != null;
 
         this.cctx = cctx;
@@ -145,6 +150,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
         this.read = read;
         this.retval = retval;
         this.timeout = timeout;
+        this.accessTtl = accessTtl;
         this.filter = filter;
 
         threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
@@ -852,7 +858,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                                             inTx() ? tx.groupLockKey() : null,
                                             inTx() && tx.partitionLock(),
                                             inTx() ? tx.subjectId() : null,
-                                            inTx() ? tx.taskNameHash() : 0);
+                                            inTx() ? tx.taskNameHash() : 0,
+                                            read ? accessTtl : -1L);
 
                                         mapping.request(req);
                                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
index 0c7bd8c..6607a66 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -74,6 +74,9 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
     /** Sync commit flag. */
     private boolean syncCommit;
 
+    /** TTL for read operation. */
+    private long accessTtl;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -82,6 +85,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
     }
 
     /**
+     * @param cacheId Cache ID.
      * @param topVer Topology version.
      * @param nodeId Node ID.
      * @param threadId Thread ID.
@@ -96,8 +100,12 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
      * @param timeout Lock timeout.
      * @param keyCnt Number of keys.
      * @param txSize Expected transaction size.
+     * @param syncCommit Synchronous commit flag.
      * @param grpLockKey Group lock key if this is a group-lock transaction.
      * @param partLock If partition is locked.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param accessTtl TTL for read operation.
      */
     public GridNearLockRequest(
         int cacheId,
@@ -119,7 +127,8 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
         @Nullable IgniteTxKey grpLockKey,
         boolean partLock,
         @Nullable UUID subjId,
-        int taskNameHash
+        int taskNameHash,
+        long accessTtl
     ) {
         super(
             cacheId,
@@ -146,6 +155,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
         this.syncCommit = syncCommit;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+        this.accessTtl = accessTtl;
 
         dhtVers = new GridCacheVersion[keyCnt];
     }
@@ -291,6 +301,13 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
         return true;
     }
 
+    /**
+     * @return TTL for read operation.
+     */
+    public long accessTtl() {
+        return accessTtl;
+    }
+
     /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
@@ -335,6 +352,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
         _clone.taskNameHash = taskNameHash;
         _clone.hasTransforms = hasTransforms;
         _clone.syncCommit = syncCommit;
+        _clone.accessTtl = accessTtl;
     }
 
     /** {@inheritDoc} */
@@ -460,6 +478,13 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
                     return false;
 
                 commState.idx++;
+
+            case 35:
+                if (!commState.putLong(accessTtl))
+                    return false;
+
+                commState.idx++;
+
         }
 
         return true;
@@ -607,6 +632,14 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
                 syncCommit = commState.getBoolean();
 
                 commState.idx++;
+
+            case 35:
+                if (buf.remaining() < 8)
+                    return false;
+
+                accessTtl = commState.getLong();
+
+                commState.idx++;
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
index f09d5c2..839a5e9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -130,12 +130,14 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
      * @param keys Keys to load.
      * @param filter Filter.
      * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
      * @return Future.
      */
     IgniteFuture<Map<K, V>> txLoadAsync(GridNearTxLocal<K, V> tx,
         @Nullable Collection<? extends K> keys,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        boolean deserializePortable) {
+        boolean deserializePortable,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc) {
         assert tx != null;
 
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
@@ -147,7 +149,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
             CU.subjectId(tx, ctx.shared()),
             tx.resolveTaskName(),
             deserializePortable,
-            null);
+            expiryPlc);
 
         // init() will register future for responses if it has remote mappings.
         fut.init();
@@ -393,11 +395,23 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout,
-        IgniteTxLocalEx<K, V> tx, boolean isInvalidate, boolean isRead, boolean retval,
-        IgniteTxIsolation isolation, IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
-        GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx, keys, (GridNearTxLocal<K, V>)tx, isRead,
-            retval, timeout, filter);
+    @Override protected IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+        long timeout,
+        IgniteTxLocalEx<K, V> tx,
+        boolean isInvalidate,
+        boolean isRead,
+        boolean retval,
+        IgniteTxIsolation isolation,
+        long accessTtl,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx,
+            keys,
+            (GridNearTxLocal<K, V>)tx,
+            isRead,
+            retval,
+            timeout,
+            accessTtl,
+            filter);
 
         if (!ctx.mvcc().addFuture(fut))
             throw new IllegalStateException("Duplicate future ID: " + fut);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
index 675933e..0d90831 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -27,6 +27,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -71,6 +72,9 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
     /** True if transaction contains colocated cache entries mapped to local node. */
     private boolean colocatedLocallyMapped;
 
+    /** Info for entries accessed locally in optimistic transaction. */
+    private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -86,9 +90,13 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
      * @param concurrency Concurrency.
      * @param isolation Isolation.
      * @param timeout Timeout.
+     * @param invalidate
+     * @param storeEnabled
      * @param txSize Transaction size.
      * @param grpLockKey Group lock key if this is a group lock transaction.
      * @param partLock {@code True} if this is a group-lock transaction and the whole partition should be locked.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
      */
     public GridNearTxLocal(
         GridCacheSharedContext<K, V> ctx,
@@ -263,12 +271,17 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
     /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> loadMissing(
         GridCacheContext<K, V> cacheCtx,
-        boolean async, final Collection<? extends K> keys,
+        boolean async,
+        final Collection<? extends K> keys,
         boolean deserializePortable,
         final IgniteBiInClosure<K, V> c
     ) {
         if (cacheCtx.isNear()) {
-            return cacheCtx.nearTx().txLoadAsync(this, keys, CU.<K, V>empty(), deserializePortable).chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() {
+            return cacheCtx.nearTx().txLoadAsync(this,
+                keys,
+                CU.<K, V>empty(),
+                deserializePortable,
+                accessPolicy(cacheCtx, keys)).chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() {
                 @Override public Boolean apply(IgniteFuture<Map<K, V>> f) {
                     try {
                         Map<K, V> map = f.get();
@@ -289,9 +302,15 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
             });
         }
         else if (cacheCtx.isColocated()) {
-            return cacheCtx.colocated().loadAsync(keys, /*reload*/false, /*force primary*/false, topologyVersion(),
-                CU.subjectId(this, cctx), resolveTaskName(), deserializePortable, null, null)
-                .chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() {
+            return cacheCtx.colocated().loadAsync(keys,
+                /*reload*/false,
+                /*force primary*/false,
+                topologyVersion(),
+                CU.subjectId(this, cctx),
+                resolveTaskName(),
+                deserializePortable,
+                null,
+                accessPolicy(cacheCtx, keys)).chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() {
                     @Override public Boolean apply(IgniteFuture<Map<K, V>> f) {
                         try {
                             Map<K, V> map = f.get();
@@ -529,8 +548,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
      */
-    void readyNearLocks(GridDistributedTxMapping<K, V> mapping, Collection<GridCacheVersion> pendingVers,
-        Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
+    void readyNearLocks(GridDistributedTxMapping<K, V> mapping,
+        Collection<GridCacheVersion> pendingVers,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers)
+    {
         Collection<IgniteTxEntry<K, V>> entries = groupLock() ?
             Collections.singletonList(groupLockEntry()) :
             F.concat(false, mapping.reads(), mapping.writes());
@@ -1044,8 +1066,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    public IgniteFuture<GridCacheReturn<V>> lockAllAsync(GridCacheContext<K, V> cacheCtx, final Collection<? extends K> keys,
-        boolean implicit, boolean read) {
+    public IgniteFuture<GridCacheReturn<V>> lockAllAsync(GridCacheContext<K, V> cacheCtx,
+        final Collection<? extends K> keys,
+        boolean implicit,
+        boolean read,
+        long accessTtl) {
         assert pessimistic();
 
         try {
@@ -1066,7 +1091,14 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
             log.debug("Before acquiring transaction lock on keys: " + keys);
 
         IgniteFuture<Boolean> fut = cacheCtx.colocated().lockAllAsyncInternal(keys,
-            lockTimeout(), this, isInvalidate(), read, /*retval*/false, isolation, CU.<K, V>empty());
+            lockTimeout(),
+            this,
+            isInvalidate(),
+            read,
+            /*retval*/false,
+            isolation,
+            accessTtl,
+            CU.<K, V>empty());
 
         return new GridEmbeddedFuture<>(
             fut,
@@ -1136,6 +1168,70 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx,
+        IgniteTxKey key,
+        @Nullable ExpiryPolicy expiryPlc)
+    {
+        assert optimistic();
+
+        if (expiryPlc == null)
+            expiryPlc = ctx.expiry();
+
+        if (expiryPlc != null) {
+            IgniteCacheExpiryPolicy plc = ctx.cache().accessExpiryPolicy(expiryPlc);
+
+            if (plc != null) {
+                if (accessMap == null)
+                    accessMap = new HashMap<>();
+
+                accessMap.put(key, plc);
+            }
+
+            return plc;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param cacheCtx Cache context.
+     * @param keys Keys.
+     * @return Expiry policy.
+     */
+    private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys) {
+        if (accessMap != null) {
+            for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
+                if (e.getKey().cacheId() == cacheCtx.cacheId() && keys.contains(e.getKey().key()))
+                    return e.getValue();
+            }
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteCheckedException {
+        super.close();
+
+        if (accessMap != null) {
+            assert optimistic();
+
+            for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
+                if (e.getValue().entries() != null) {
+                    GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId());
+
+                    if (cctx0.isNear())
+                        cctx0.near().dht().sendTtlUpdateRequest(e.getValue());
+                    else
+                        cctx0.dht().sendTtlUpdateRequest(e.getValue());
+                }
+            }
+
+            accessMap = null;
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
index 9ea91b9..c461894 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
@@ -86,9 +86,14 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys, long timeout,
-        IgniteTxLocalEx<K, V> tx, boolean isRead,
-        boolean retval, IgniteTxIsolation isolation, boolean invalidate,
+    @Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys,
+        long timeout,
+        IgniteTxLocalEx<K, V> tx,
+        boolean isRead,
+        boolean retval,
+        IgniteTxIsolation isolation,
+        boolean invalidate,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         return lockAllAsync(keys, timeout, tx, filter);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
index 0226ff2..86fed36 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
@@ -173,6 +173,7 @@ class GridLocalTx<K, V> extends IgniteTxLocalAdapter<K, V> {
         rollbackAsync().get();
     }
 
+    /** {@inheritDoc} */
     @Override public IgniteFuture<IgniteTx> rollbackAsync() {
         try {
             state(ROLLING_BACK);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 88b6cfc..eaf0173 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -594,8 +594,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         if (keyCheck)
             validateCacheKeys(keys);
 
-        final GetExpiryPolicy expiry =
-            GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+        final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc);
 
         boolean success = true;
 
@@ -1287,6 +1286,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         boolean retval,
         IgniteTxIsolation isolation,
         boolean invalidate,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         return new GridFinishedFutureEx<>(new UnsupportedOperationException("Locks are not supported for " +
             "GridCacheAtomicityMode.ATOMIC mode (use GridCacheAtomicityMode.TRANSACTIONAL instead)"));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
index 8d3e6a0..1d4b5d7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
@@ -1149,8 +1149,15 @@ public class IgniteTxHandler<K, V> {
                                     "(transaction has been completed): " + req.version());
                         }
 
-                        tx.addWrite(cacheCtx, txEntry.op(), txEntry.txKey(), txEntry.keyBytes(), txEntry.value(),
-                            txEntry.valueBytes(), txEntry.transformClosures(), txEntry.drVersion());
+                        tx.addWrite(cacheCtx,
+                            txEntry.op(),
+                            txEntry.txKey(),
+                            txEntry.keyBytes(),
+                            txEntry.value(),
+                            txEntry.valueBytes(),
+                            txEntry.transformClosures(),
+                            txEntry.drVersion(),
+                            txEntry.ttl());
 
                         if (!marked) {
                             if (tx.markFinalizing(USER_FINISH))

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 8e742fd..27c1f44 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -410,6 +410,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     /**
      * Gets cache entry for given key.
      *
+     * @param cacheCtx Cache context.
      * @param key Key.
      * @return Cache entry.
      */
@@ -420,6 +421,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     /**
      * Gets cache entry for given key and topology version.
      *
+     * @param cacheCtx Cache context.
      * @param key Key.
      * @param topVer Topology version.
      * @return Cache entry.
@@ -996,6 +998,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         }
     }
 
+    protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx,
+        IgniteTxKey key,
+        @Nullable ExpiryPolicy expiryPlc) {
+        return null;
+    }
+
     /**
      * Checks if there is a cached or swapped value for
      * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
@@ -1156,6 +1164,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         V val = null;
 
                         if (!pessimistic() || readCommitted() || groupLock()) {
+                            IgniteCacheExpiryPolicy accessPlc =
+                                optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
+
                             // This call will check for filter.
                             val = entry.innerGet(this,
                                 /*swap*/true,
@@ -1169,7 +1180,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 null,
                                 resolveTaskName(),
                                 filter,
-                                null);
+                                accessPlc);
 
                             if (val != null) {
                                 V val0 = val;
@@ -1269,6 +1280,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * Loads all missed keys for
      * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
      *
+     * @param cacheCtx Cache context.
      * @param map Return map.
      * @param missedMap Missed keys.
      * @param redos Keys to retry.
@@ -1466,7 +1478,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     @Override public IgniteFuture<Map<K, V>> getAllAsync(
         final GridCacheContext<K, V> cacheCtx,
         Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx<K, V> cached, final boolean deserializePortable,
+        @Nullable GridCacheEntryEx<K, V> cached,
+        final boolean deserializePortable,
         final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
@@ -1486,10 +1499,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
             GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall();
 
+            ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
+
             final Collection<K> lockKeys = enlistRead(cacheCtx,
                 keys,
                 cached,
-                prj != null ? prj.expiry() : null,
+                expiryPlc,
                 retMap,
                 missed,
                 keysCnt,
@@ -1501,8 +1516,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
             // Handle locks.
             if (pessimistic() && !readCommitted() && !groupLock()) {
-                IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), this, true, true,
-                    isolation, isInvalidate(), CU.<K, V>empty());
+                if (expiryPlc == null)
+                    expiryPlc = cacheCtx.expiry();
+
+                long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : -1L;
+
+                IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
+                    lockTimeout(),
+                    this,
+                    true,
+                    true,
+                    isolation,
+                    isInvalidate(),
+                    accessTtl,
+                    CU.<K, V>empty());
 
                 PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
                     @Override public IgniteFuture<Map<K, V>> postLock() throws IgniteCheckedException {
@@ -2053,6 +2080,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param rmv {@code True} if remove.
      * @param retval Flag to return value or not.
      * @param read {@code True} if read.
+     * @param accessTtl TTL for read operation.
      * @param filter Filter to check entries.
      * @return Failed keys.
      * @throws IgniteCheckedException If error.
@@ -2067,6 +2095,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         boolean rmv,
         boolean retval,
         boolean read,
+        long accessTtl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter
     ) throws IgniteCheckedException {
         for (K k : keys) {
@@ -2158,10 +2187,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     }
 
                     if (updateTtl) {
-                        ExpiryPolicy expiryPlc = txEntry.expiry() != null ? txEntry.expiry() : cacheCtx.expiry();
+                        if (!read) {
+                            ExpiryPolicy expiryPlc = txEntry.expiry() != null ? txEntry.expiry() : cacheCtx.expiry();
 
-                        if (expiryPlc != null)
-                            txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess()));
+                            if (expiryPlc != null)
+                                txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess()));
+                        }
+                        else
+                            txEntry.ttl(accessTtl);
                     }
 
                     break; // While.
@@ -2336,8 +2369,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 if (log.isDebugEnabled())
                     log.debug("Before acquiring transaction lock for put on keys: " + keys);
 
-                IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys, lockTimeout(), this, false,
-                    retval, isolation, isInvalidate(), CU.<K, V>empty());
+                IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys,
+                    lockTimeout(),
+                    this,
+                    false,
+                    retval,
+                    isolation,
+                    isInvalidate(),
+                    -1L,
+                    CU.<K, V>empty());
 
                 PLC1<GridCacheReturn<V>> plc1 = new PLC1<GridCacheReturn<V>>(ret) {
                     @Override public GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException {
@@ -2355,6 +2395,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             /*remove*/false,
                             retval,
                             /*read*/false,
+                            -1L,
                             filter);
 
                         return ret;
@@ -2530,8 +2571,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 if (log.isDebugEnabled())
                     log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys);
 
-                IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys, lockTimeout(), this, false, retval,
-                    isolation, isInvalidate(), CU.<K, V>empty());
+                IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys,
+                    lockTimeout(),
+                    this,
+                    false,
+                    retval,
+                    isolation,
+                    isInvalidate(),
+                    -1L,
+                    CU.<K, V>empty());
 
                 PLC1<GridCacheReturn<V>> plc1 = new PLC1<GridCacheReturn<V>>(ret) {
                     @Override protected GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException {
@@ -2547,6 +2595,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             /*remove*/true,
                             retval,
                             /*read*/false,
+                            -1L,
                             filter);
 
                         return ret;
@@ -2681,7 +2730,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
             // Lock group key in pessimistic mode only.
             return pessimistic() ?
-                cacheCtx.cache().txLockAsync(enlisted, lockTimeout(), this, false, false, isolation, isInvalidate(),
+                cacheCtx.cache().txLockAsync(enlisted,
+                    lockTimeout(),
+                    this,
+                    false,
+                    false,
+                    isolation,
+                    isInvalidate(),
+                    -1L,
                     CU.<K, V>empty()) :
                 new GridFinishedFuture<>(cctx.kernalContext());
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 6100479..a57da71 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -163,40 +163,58 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         }
 
         if (atomicityMode() == TRANSACTIONAL) {
-            for (final Integer key : keys()) {
-                log.info("Test txGet [key=" + key + ']');
+            IgniteTxConcurrency[] txModes = {PESSIMISTIC};
 
-                txGet(key);
+            for (IgniteTxConcurrency txMode : txModes) {
+                for (final Integer key : keys()) {
+                    log.info("Test txGet [key=" + key + ", txMode=" + txMode + ']');
+
+                    txGet(key, txMode);
+                }
             }
 
-            txGetAll();
+            for (IgniteTxConcurrency txMode : txModes) {
+                log.info("Test txGetAll [txMode=" + txMode + ']');
+
+                txGetAll(txMode);
+            }
         }
     }
 
     /**
      * @param key Key.
+     * @param txMode Transaction concurrency mode.
      * @throws Exception If failed.
      */
-    private void txGet(Integer key) throws Exception {
+    private void txGet(Integer key, IgniteTxConcurrency txMode) throws Exception {
         IgniteCache<Integer, Integer> cache = jcache();
 
         cache.put(key, 1);
 
         checkTtl(key, 60_000L);
 
-        try (IgniteTx tx = ignite(0).transactions().txStart()) {
+        try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) {
             assertEquals((Integer)1, cache.get(key));
 
             tx.commit();
         }
 
         checkTtl(key, 62_000L, true);
+
+        try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) {
+            assertEquals((Integer)1, cache.withExpiryPolicy(new TestPolicy(100L, 200L, 1000L)).get(key));
+
+            tx.commit();
+        }
+
+        checkTtl(key, 1000L, true);
     }
 
     /**
+     * @param txMode Transaction concurrency mode.
      * @throws Exception If failed.
      */
-    private void txGetAll() throws Exception {
+    private void txGetAll(IgniteTxConcurrency txMode) throws Exception {
         IgniteCache<Integer, Integer> cache = jcache(0);
 
         Map<Integer, Integer> vals = new HashMap<>();
@@ -206,7 +224,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         cache.putAll(vals);
 
-        try (IgniteTx tx = ignite(0).transactions().txStart()) {
+        try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) {
             assertEquals(vals, cache.getAll(vals.keySet()));
 
             tx.commit();
@@ -214,6 +232,15 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         for (Integer key : vals.keySet())
             checkTtl(key, 62_000L);
+
+        try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) {
+            assertEquals(vals, cache.withExpiryPolicy(new TestPolicy(100L, 200L, 1000L)).getAll(vals.keySet()));
+
+            tx.commit();
+        }
+
+        for (Integer key : vals.keySet())
+            checkTtl(key, 1000L);
     }
 
     /**