You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/23 09:22:49 UTC

[13/38] incubator-ignite git commit: # ignite-41

# ignite-41


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/85de2471
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/85de2471
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/85de2471

Branch: refs/heads/ignite-1
Commit: 85de247138059e611110703165ddc07bded66cc4
Parents: 5f95bd2
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 18 15:05:39 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 18 17:50:41 2014 +0300

----------------------------------------------------------------------
 .../cache/GridCacheAccessExpiryPolicy.java      |  34 +++--
 .../processors/cache/GridCacheAdapter.java      |   2 +-
 .../processors/cache/GridCacheEntryEx.java      |   4 +-
 .../processors/cache/GridCacheExpiryPolicy.java |  47 ++++++
 .../processors/cache/GridCacheMapEntry.java     |  74 ++++-----
 .../processors/cache/GridCacheTxEntry.java      |   2 +-
 .../kernal/processors/cache/GridCacheUtils.java |   2 +-
 .../distributed/GridCacheExpiryPolicy.java      | 152 ------------------
 .../GridCacheExternalizableExpiryPolicy.java    | 153 +++++++++++++++++++
 .../distributed/dht/GridDhtCacheAdapter.java    |  10 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |   4 +-
 .../dht/GridPartitionedGetFuture.java           |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 128 +++++++++++++---
 .../dht/atomic/GridNearAtomicUpdateRequest.java |   2 +-
 .../distributed/near/GridNearAtomicCache.java   |  15 +-
 .../distributed/near/GridNearCacheAdapter.java  |  23 ++-
 .../distributed/near/GridNearCacheEntry.java    |  11 +-
 .../distributed/near/GridNearGetFuture.java     |   8 +-
 .../near/GridNearTransactionalCache.java        |  14 +-
 .../local/atomic/GridLocalAtomicCache.java      |  18 ++-
 .../GridTcpCommunicationMessageFactory.java     |   2 +-
 .../IgniteCacheExpiryPolicyAbstractTest.java    | 119 +++++++++++----
 .../cache/GridCacheAbstractTtlSelfTest.java     |  95 ------------
 .../processors/cache/GridCacheTestEntryEx.java  |   6 +-
 .../near/GridCachePartitionedTtlSelfTest.java   |  25 ---
 .../GridCacheReplicatedTtlSelfTest.java         |  24 ---
 .../cache/local/GridCacheLocalTtlSelfTest.java  |  25 ---
 .../bamboo/GridDataGridTestSuite.java           |   2 -
 .../cache/GridCacheAbstractQuerySelfTest.java   |  14 +-
 29 files changed, 538 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
index 07f4ae8..f0904dd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
@@ -20,9 +20,9 @@ import java.util.*;
 /**
  *
  */
-public class GridCacheAccessExpiryPolicy {
+public class GridCacheAccessExpiryPolicy implements GridCacheExpiryPolicy {
     /** */
-    private final long ttl;
+    private final long accessTtl;
 
     /** */
     private volatile Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries;
@@ -44,19 +44,27 @@ public class GridCacheAccessExpiryPolicy {
     }
 
     /**
-     * @param ttl TTL for access.
+     * @param accessTtl TTL for access.
      */
-    public GridCacheAccessExpiryPolicy(long ttl) {
-        assert ttl >= 0 : ttl;
+    public GridCacheAccessExpiryPolicy(long accessTtl) {
+        assert accessTtl >= 0 : accessTtl;
 
-        this.ttl = ttl;
+        this.accessTtl = accessTtl;
     }
 
-    /**
-     * @return TTL.
-     */
-    public long ttl() {
-        return ttl;
+    /** {@inheritDoc} */
+    @Override public long forAccess() {
+        return accessTtl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long forCreate() {
+        return -1L;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long forUpdate() {
+        return -1L;
     }
 
     /**
@@ -75,7 +83,7 @@ public class GridCacheAccessExpiryPolicy {
      * @param ver Entry version.
      */
     @SuppressWarnings("unchecked")
-    public void ttlUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) {
+    @Override public void onAccessUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) {
         Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries;
 
         if (entries0 == null) {
@@ -93,7 +101,7 @@ public class GridCacheAccessExpiryPolicy {
     /**
      * @return TTL update request.
      */
-    @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
+    @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
         return entries;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index a4ec90f..46029e7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -1765,7 +1765,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
         final String taskName,
         final boolean deserializePortable,
         final boolean forcePrimary,
-        @Nullable GridCacheAccessExpiryPolicy expiry,
+        @Nullable GridCacheExpiryPolicy expiry,
         @Nullable final IgnitePredicate<GridCacheEntry<K, V>>... filter
         ) {
         ctx.checkSecurity(GridSecurityPermission.CACHE_READ);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index c6e3ea6..b684183 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -292,7 +292,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
         Object transformClo,
         String taskName,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable GridCacheAccessExpiryPolicy expiryPlc)
+        @Nullable GridCacheExpiryPolicy expiryPlc)
         throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException;
 
     /**
@@ -425,7 +425,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
         @Nullable byte[] valBytes,
         boolean writeThrough,
         boolean retval,
-        @Nullable ExpiryPolicy expiryPlc,
+        @Nullable GridCacheExpiryPolicy expiryPlc,
         boolean evt,
         boolean metrics,
         boolean primary,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java
new file mode 100644
index 0000000..3ce9572
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java
@@ -0,0 +1,47 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public interface GridCacheExpiryPolicy {
+    /**
+     * @return TTL.
+     */
+    public abstract long forCreate();
+
+    /**
+     * @return TTL.
+     */
+    public abstract long forUpdate();
+
+    /**
+     * @return TTL.
+     */
+    public abstract long forAccess();
+
+    /**
+     * @param key Entry key.
+     * @param keyBytes Entry key bytes.
+     * @param ver Entry version.
+     */
+    public void onAccessUpdated(Object key, byte[] keyBytes, GridCacheVersion ver);
+
+    /**
+     * @return TTL update request.
+     */
+    @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index c191dda..19d28b6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -700,7 +700,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         Object transformClo,
         String taskName,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable GridCacheAccessExpiryPolicy expirePlc)
+        @Nullable GridCacheExpiryPolicy expirePlc)
         throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException {
         cctx.denyOnFlag(LOCAL);
 
@@ -733,7 +733,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         Object transformClo,
         String taskName,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable GridCacheAccessExpiryPolicy expiryPlc)
+        @Nullable GridCacheExpiryPolicy expiryPlc)
         throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException {
         // Disable read-through if there is no store.
         if (readThrough && !cctx.isStoreEnabled())
@@ -882,13 +882,13 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             }
 
             if (ret != null && expiryPlc != null) {
-                long ttl = expiryPlc.ttl();
+                long ttl = expiryPlc.forAccess();
 
                 assert ttl >= 0 : ttl;
 
                 updateTtl(ttl);
 
-                expiryPlc.ttlUpdated(key(), getOrMarshalKeyBytes(), version());
+                expiryPlc.onAccessUpdated(key(), getOrMarshalKeyBytes(), version());
             }
         }
 
@@ -1464,6 +1464,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         assert cctx.isLocal() && cctx.atomic();
 
         V old;
+
         boolean res = true;
 
         IgniteBiTuple<Boolean, ?> interceptorRes = null;
@@ -1501,8 +1502,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             if (!F.isEmpty(filter)) {
                 boolean pass = cctx.isAll(wrapFilterLocked(), filter);
 
-                if (!pass)
+                if (!pass) {
+                    if (expiryPlc != null && hasValueUnlocked()) {
+                        long ttl = toTtl(expiryPlc.getExpiryForAccess());
+
+                        if (ttl != -1L)
+                            updateTtl(ttl);
+                    }
+
                     return new IgniteBiTuple<>(false, retval ? old : null);
+                }
             }
 
             // Apply metrics.
@@ -1658,20 +1667,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     }
 
     /**
-     * @param expiryPlc Expiry policy.
-     * @param isNew {@code True} if entry is new.
-     * @return TTL.
-     */
-    private static long ttlFromPolicy(@Nullable ExpiryPolicy expiryPlc, boolean isNew) {
-        if (expiryPlc == null)
-            return -1L;
-
-        Duration duration = isNew ? expiryPlc.getExpiryForCreation() : expiryPlc.getExpiryForUpdate();
-
-        return toTtl(duration);
-    }
-
-    /**
      * @param duration Duration.
      * @return TTL.
      */
@@ -1703,7 +1698,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         @Nullable byte[] valBytes,
         boolean writeThrough,
         boolean retval,
-        @Nullable ExpiryPolicy expiryPlc,
+        @Nullable GridCacheExpiryPolicy expiryPlc,
         boolean evt,
         boolean metrics,
         boolean primary,
@@ -1750,7 +1745,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     op,
                     writeObj,
                     valBytes,
-                    ttlFromPolicy(expiryPlc, isNew()),
+                    expiryPlc != null ? (isNew() ? expiryPlc.forCreate() : expiryPlc.forUpdate()) : -1L,
                     drTtl,
                     drExpireTime,
                     drVer);
@@ -1844,18 +1839,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                 if (!pass) {
                     if (hasValueUnlocked() && expiryPlc != null) {
-                        Duration duration = expiryPlc.getExpiryForAccess();
+                        newTtl = expiryPlc.forAccess();
 
-                        newTtl = toTtl(duration);
-
-                        if (newTtl != -1L)
+                        if (newTtl != -1L) {
                             updateTtl(newTtl);
+
+                            expiryPlc.onAccessUpdated(key, getOrMarshalKeyBytes(), version());
+                        }
                     }
 
                     return new GridCacheUpdateAtomicResult<>(false,
                         retval ? old : null,
                         null,
-                        newTtl,
+                        0L,
                         -1L,
                         null,
                         null,
@@ -1929,25 +1925,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     else {
                         assert drExpireTime == -1L;
 
-                        if (expiryPlc != null) {
-                            if (!hadVal) {
-                                Duration duration = expiryPlc.getExpiryForCreation();
-
-                                if (duration != null && duration.isZero())
-                                    return new GridCacheUpdateAtomicResult<>(false,
-                                        retval ? old : null,
-                                        null,
-                                        0L,
-                                        -1L,
-                                        null,
-                                        null,
-                                        false);
-
-                                newTtl = toTtl(duration);
-                            }
-                            else
-                                newTtl = toTtl(expiryPlc.getExpiryForUpdate());
-                        }
+                        if (expiryPlc != null)
+                            newTtl = hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate();
                         else
                             newTtl = -1L;
 
@@ -3428,6 +3407,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     /** {@inheritDoc} */
     @Override public void updateTtl(GridCacheVersion ver, long ttl) {
         synchronized (this) {
+            updateTtl(ttl);
+
+            /*
+            TODO IGNITE-41.
             try {
                 if (ver.equals(version()))
                     updateTtl(ttl);
@@ -3435,6 +3418,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             catch (GridCacheEntryRemovedException ignored) {
                 // No-op.
             }
+            */
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
index df888e7..0bff22b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
@@ -836,7 +836,7 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab
         out.writeBoolean(grpLock);
         CU.writeVersion(out, drVer);
 
-        out.writeObject(transferExpiryPlc ? new GridCacheExpiryPolicy(expiryPlc) : null);
+        out.writeObject(transferExpiryPlc ? new GridCacheExternalizableExpiryPolicy(expiryPlc) : null);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
index d28f728..77f6a7a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
@@ -1586,7 +1586,7 @@ public class GridCacheUtils {
      * @return Expire time.
      */
     public static long toExpireTime(long ttl) {
-        assert ttl >= 0L;
+        assert ttl >= 0L : ttl;
 
         if (ttl == 0L)
             return 0L;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
deleted file mode 100644
index 3a77884..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed;
-
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.expiry.*;
-import java.io.*;
-import java.util.concurrent.*;
-
-/**
- *
- */
-public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
-    /** */
-    private ExpiryPolicy plc;
-
-    /** */
-    private static final byte CREATE_TTL_MASK = 0x01;
-
-    /** */
-    private static final byte UPDATE_TTL_MASK = 0x02;
-
-    /** */
-    private static final byte ACCESS_TTL_MASK = 0x04;
-
-    /** */
-    private Duration forCreate;
-
-    /** */
-    private Duration forUpdate;
-
-    /** */
-    private Duration forAccess;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheExpiryPolicy() {
-        // No-op.
-    }
-
-    /**
-     * @param plc Expiry policy.
-     */
-    public GridCacheExpiryPolicy(ExpiryPolicy plc) {
-        assert plc != null;
-
-        this.plc = plc;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Duration getExpiryForCreation() {
-        return forCreate;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Duration getExpiryForAccess() {
-        return forAccess;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Duration getExpiryForUpdate() {
-        return forUpdate;
-    }
-
-    /**
-     * @param out Output stream.
-     * @param duration Duration.
-     * @throws IOException
-     */
-    private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException {
-        if (duration != null) {
-            if (duration.isEternal())
-                out.writeLong(0);
-            else if (duration.getDurationAmount() == 0)
-                out.writeLong(1);
-            else
-                out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
-        }
-    }
-
-    /**
-     * @param in Input stream.
-     * @return Duration.
-     * @throws IOException
-     */
-    private Duration readDuration(ObjectInput in) throws IOException {
-        long ttl = in.readLong();
-
-        assert ttl >= 0;
-
-        if (ttl == 0)
-            return Duration.ETERNAL;
-
-        return new Duration(TimeUnit.MILLISECONDS, ttl);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        byte flags = 0;
-
-        Duration create = plc.getExpiryForCreation();
-
-        if (create != null)
-            flags |= CREATE_TTL_MASK;
-
-        Duration update = plc.getExpiryForUpdate();
-
-        if (update != null)
-            flags |= UPDATE_TTL_MASK;
-
-        Duration access = plc.getExpiryForAccess();
-
-        if (access != null)
-            flags |= ACCESS_TTL_MASK;
-
-        out.writeByte(flags);
-
-        writeDuration(out, create);
-
-        writeDuration(out, update);
-
-        writeDuration(out, access);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        byte flags = in.readByte();
-
-        if ((flags & CREATE_TTL_MASK) != 0)
-            forCreate = readDuration(in);
-
-        if ((flags & UPDATE_TTL_MASK) != 0)
-            forUpdate = readDuration(in);
-
-        if ((flags & ACCESS_TTL_MASK) != 0)
-            forAccess = readDuration(in);
-    }
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheExpiryPolicy.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java
new file mode 100644
index 0000000..75da5de
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java
@@ -0,0 +1,153 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.expiry.*;
+import java.io.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class GridCacheExternalizableExpiryPolicy implements ExpiryPolicy, Externalizable {
+    /** */
+    private ExpiryPolicy plc;
+
+    /** */
+    private static final byte CREATE_TTL_MASK = 0x01;
+
+    /** */
+    private static final byte UPDATE_TTL_MASK = 0x02;
+
+    /** */
+    private static final byte ACCESS_TTL_MASK = 0x04;
+
+    /** */
+    private Duration forCreate;
+
+    /** */
+    private Duration forUpdate;
+
+    /** */
+    private Duration forAccess;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public GridCacheExternalizableExpiryPolicy() {
+        // No-op.
+    }
+
+    /**
+     * @param plc Expiry policy.
+     */
+    public GridCacheExternalizableExpiryPolicy(ExpiryPolicy plc) {
+        assert plc != null;
+
+        this.plc = plc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Duration getExpiryForCreation() {
+        return forCreate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Duration getExpiryForAccess() {
+        return forAccess;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Duration getExpiryForUpdate() {
+        return forUpdate;
+    }
+
+    /**
+     * @param out Output stream.
+     * @param duration Duration.
+     * @throws IOException If failed.
+     */
+    private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException {
+        if (duration != null) {
+            if (duration.isEternal())
+                out.writeLong(0);
+            else if (duration.getDurationAmount() == 0)
+                out.writeLong(1);
+            else
+                out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
+        }
+    }
+
+    /**
+     * @param in Input stream.
+     * @return Duration.
+     * @throws IOException If failed.
+     */
+    private Duration readDuration(ObjectInput in) throws IOException {
+        long ttl = in.readLong();
+
+        assert ttl >= 0;
+
+        if (ttl == 0)
+            return Duration.ETERNAL;
+
+        return new Duration(TimeUnit.MILLISECONDS, ttl);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        byte flags = 0;
+
+        Duration create = plc.getExpiryForCreation();
+
+        if (create != null)
+            flags |= CREATE_TTL_MASK;
+
+        Duration update = plc.getExpiryForUpdate();
+
+        if (update != null)
+            flags |= UPDATE_TTL_MASK;
+
+        Duration access = plc.getExpiryForAccess();
+
+        if (access != null)
+            flags |= ACCESS_TTL_MASK;
+
+        out.writeByte(flags);
+
+        writeDuration(out, create);
+
+        writeDuration(out, update);
+
+        writeDuration(out, access);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        byte flags = in.readByte();
+
+        if ((flags & CREATE_TTL_MASK) != 0)
+            forCreate = readDuration(in);
+
+        if ((flags & UPDATE_TTL_MASK) != 0)
+            forUpdate = readDuration(in);
+
+        if ((flags & ACCESS_TTL_MASK) != 0)
+            forAccess = readDuration(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheExternalizableExpiryPolicy.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index fef3b82..a7b1fea 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -428,7 +428,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
     /**
      * This method is used internally. Use
-     * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, IgnitePredicate[], GridCacheAccessExpiryPolicy)}
+     * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, IgnitePredicate[], org.gridgain.grid.kernal.processors.cache.GridCacheExpiryPolicy)}
      * method instead to retrieve DHT value.
      *
      * @param keys {@inheritDoc}
@@ -483,7 +483,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         String taskName,
         boolean deserializePortable,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable GridCacheAccessExpiryPolicy expiry
+        @Nullable GridCacheExpiryPolicy expiry
         ) {
         return getAllAsync(keys,
             null,
@@ -518,7 +518,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         int taskNameHash,
         boolean deserializePortable,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable GridCacheAccessExpiryPolicy expiry) {
+        @Nullable GridCacheExpiryPolicy expiry) {
         GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx,
             msgId,
             reader,
@@ -599,7 +599,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     /**
      * @param expiryPlc Expiry policy.
      */
-    protected void sendTtlUpdateRequest(@Nullable final GridCacheAccessExpiryPolicy expiryPlc) {
+    public void sendTtlUpdateRequest(@Nullable final GridCacheExpiryPolicy expiryPlc) {
         if (expiryPlc != null && expiryPlc.entries() != null) {
             ctx.closures().runLocalSafe(new Runnable() {
                 @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"})
@@ -622,7 +622,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                                 GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node);
 
                                 if (req == null) {
-                                    reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(expiryPlc.ttl()));
+                                    reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(expiryPlc.forAccess()));
 
                                     req.cacheId(ctx.cacheId());
                                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 2ce4b6c..7a0db43 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -84,7 +84,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     private boolean deserializePortable;
 
     /** Expiry policy. */
-    private GridCacheAccessExpiryPolicy expiryPlc;
+    private GridCacheExpiryPolicy expiryPlc;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -119,7 +119,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         @Nullable UUID subjId,
         int taskNameHash,
         boolean deserializePortable,
-        @Nullable GridCacheAccessExpiryPolicy expiryPlc) {
+        @Nullable GridCacheExpiryPolicy expiryPlc) {
         super(cctx.kernalContext(), CU.<GridCacheEntryInfo<K, V>>collectionsReducer());
 
         assert reader != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 4a3491d..942d0c5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -89,7 +89,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     private boolean deserializePortable;
 
     /** Expiry policy. */
-    private GridCacheAccessExpiryPolicy expiryPlc;
+    private GridCacheExpiryPolicy expiryPlc;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -121,7 +121,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
-        @Nullable GridCacheAccessExpiryPolicy expiryPlc
+        @Nullable GridCacheExpiryPolicy expiryPlc
     ) {
         super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
 
@@ -363,7 +363,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                     filters,
                     subjId,
                     taskName == null ? 0 : taskName.hashCode(),
-                    expiryPlc != null ? expiryPlc.ttl() : -1L);
+                    expiryPlc != null ? expiryPlc.forAccess() : -1L);
 
                 add(fut); // Append new future.
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 3b07fc0..c3ef323 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -16,7 +16,6 @@ import org.apache.ignite.plugin.security.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.managers.communication.*;
 import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
@@ -893,6 +892,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
+        GridCacheExpiryPolicy expiry = null;
+
         try {
             // If batch store update is enabled, we need to lock all entries.
             // First, need to acquire locks on cache entries, then check filter.
@@ -941,17 +942,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         boolean replicate = ctx.isDrEnabled();
 
+                        expiry = expiryPolicy(req.expiry() != null ? req.expiry() : ctx.expiry());
+
                         if (storeEnabled() && keys.size() > 1 && !ctx.dr().receiveEnabled()) {
                             // This method can only be used when there are no replicated entries in the batch.
-                            UpdateBatchResult<K, V> updRes = updateWithBatch(node, hasNear, req, res, locked, ver,
-                                dhtFut, completionCb, replicate, taskName);
+                            UpdateBatchResult<K, V> updRes = updateWithBatch(node,
+                                hasNear,
+                                req,
+                                res,
+                                locked,
+                                ver,
+                                dhtFut,
+                                completionCb,
+                                replicate,
+                                taskName,
+                                expiry);
 
                             deleted = updRes.deleted();
                             dhtFut = updRes.dhtFuture();
                         }
                         else {
-                            UpdateSingleResult<K, V> updRes = updateSingle(node, hasNear, req, res, locked, ver,
-                                dhtFut, completionCb, replicate, taskName);
+                            UpdateSingleResult<K, V> updRes = updateSingle(node,
+                                hasNear,
+                                req,
+                                res,
+                                locked,
+                                ver,
+                                dhtFut,
+                                completionCb,
+                                replicate,
+                                taskName,
+                                expiry);
 
                             retVal = updRes.returnValue();
                             deleted = updRes.deleted();
@@ -1013,6 +1034,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             else
                 completionCb.apply(req, res);
         }
+
+        sendTtlUpdateRequest(expiry);
     }
 
     /**
@@ -1027,6 +1050,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param dhtFut Optional DHT future.
      * @param completionCb Completion callback to invoke when DHT future is completed.
      * @param replicate Whether replication is enabled.
+     * @param taskName Task name.
+     * @param expiry Expiry policy.
      * @return Deleted entries.
      * @throws GridCacheEntryRemovedException Should not be thrown.
      */
@@ -1041,7 +1066,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut,
         CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb,
         boolean replicate,
-        String taskName
+        String taskName,
+        @Nullable GridCacheExpiryPolicy expiry
     ) throws GridCacheEntryRemovedException {
         // Cannot update in batches during DR due to possible conflicts.
         assert !req.returnValue(); // Should not request return values for putAll.
@@ -1078,7 +1104,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             try {
                 if (!checkFilter(entry, req, res)) {
-                    // TODO IGNITE-41 update TTL.
+                    if (expiry != null && entry.hasValue()) {
+                        long ttl = expiry.forAccess();
+
+                        if (ttl != -1L) {
+                            entry.updateTtl(null, ttl);
+
+                            expiry.onAccessUpdated(entry.key(), entry.getOrMarshalKeyBytes(), entry.version());
+                        }
+                    }
 
                     if (log.isDebugEnabled())
                         log.debug("Entry did not pass the filter (will skip write) [entry=" + entry +
@@ -1143,7 +1177,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 res,
                                 replicate,
                                 updRes,
-                                taskName);
+                                taskName,
+                                expiry);
 
                             firstEntryIdx = i + 1;
 
@@ -1184,7 +1219,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 res,
                                 replicate,
                                 updRes,
-                                taskName);
+                                taskName,
+                                expiry);
 
                             firstEntryIdx = i + 1;
 
@@ -1293,7 +1329,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 res,
                 replicate,
                 updRes,
-                taskName);
+                taskName,
+                expiry);
         }
         else
             assert filtered.isEmpty();
@@ -1366,6 +1403,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param completionCb Completion callback to invoke when DHT future is completed.
      * @param replicate Whether DR is enabled for that cache.
      * @param taskName Task name.
+     * @param expiry Expiry policy.
      * @return Return value.
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
@@ -1379,7 +1417,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut,
         CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb,
         boolean replicate,
-        String taskName
+        String taskName,
+        @Nullable GridCacheExpiryPolicy expiry
     ) throws GridCacheEntryRemovedException {
         GridCacheReturn<Object> retVal = null;
         Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted = null;
@@ -1394,8 +1433,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
-        ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
-
         // Avoid iterator creation.
         for (int i = 0; i < keys.size(); i++) {
             K k = keys.get(i);
@@ -1505,8 +1542,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 expireTime);
                     }
                     else {
-                        // TODO IGNITE-41 ttl could be changed.
-
                         if (log.isDebugEnabled())
                             log.debug("Entry did not pass the filter or conflict resolution (will skip write) " +
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
@@ -1610,7 +1645,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final GridNearAtomicUpdateResponse<K, V> res,
         boolean replicate,
         UpdateBatchResult<K, V> batchRes,
-        String taskName
+        String taskName,
+        @Nullable GridCacheExpiryPolicy expiry
     ) {
         assert putMap == null ^ rmvKeys == null;
 
@@ -1658,8 +1694,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             boolean intercept = ctx.config().getInterceptor() != null;
 
-            ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
-
             // Avoid iterator creation.
             for (int i = 0; i < entries.size(); i++) {
                 GridDhtCacheEntry<K, V> entry = entries.get(i);
@@ -1703,7 +1737,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         true,
                         primary,
                         ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
-                        req.filter(), // TODO IGNITE-41 filter already evaluated?
+                        null,
                         replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
                         -1L,
                         -1L,
@@ -2375,6 +2409,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
     }
 
+    /**
+     * @param plc Expiry policy.
+     * @return Expiry policy wrapper.
+     */
+    static GridCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) {
+        return plc == null ? null : new UpdateExpiryPolicy(plc);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtAtomicCache.class, this, super.toString());
@@ -2652,4 +2694,52 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             pendingResponses.remove(nodeId, this);
         }
     }
+
+    /**
+     *
+     */
+    private static class UpdateExpiryPolicy implements GridCacheExpiryPolicy {
+        /** */
+        private final ExpiryPolicy plc;
+
+        /** */
+        private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries;
+
+        /**
+         * @param plc Expiry policy.
+         */
+        private UpdateExpiryPolicy(ExpiryPolicy plc) {
+            assert plc != null;
+
+            this.plc = plc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long forCreate() {
+            return GridCacheMapEntry.toTtl(plc.getExpiryForCreation());
+        }
+
+        /** {@inheritDoc} */
+        @Override public long forUpdate() {
+            return GridCacheMapEntry.toTtl(plc.getExpiryForUpdate());
+        }
+
+        /** {@inheritDoc} */
+        @Override public long forAccess() {
+            return GridCacheMapEntry.toTtl(plc.getExpiryForAccess());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAccessUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) {
+            if (entries == null)
+                entries = new HashMap<>();
+
+            entries.put(key, new IgniteBiTuple<>(keyBytes, ver));
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
+            return entries;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 3eca7e2..1265edb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -492,7 +492,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
         filterBytes = marshalFilter(filter, ctx);
 
         if (expiryPlc != null)
-            expiryPlcBytes = CU.marshal(ctx, new GridCacheExpiryPolicy(expiryPlc));
+            expiryPlcBytes = CU.marshal(ctx, new GridCacheExternalizableExpiryPolicy(expiryPlc));
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index 1da6626..5c89c3f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -23,7 +23,6 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
-import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 
@@ -363,9 +362,19 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
 
-        subjId = ctx.subjectIdPerCall(subjId);
+        GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
 
-        return loadAsync(null, keys, false, forcePrimary, filter, subjId, taskName, deserializePortable);
+        subjId = ctx.subjectIdPerCall(subjId, prj);
+
+        return loadAsync(null,
+            keys,
+            false,
+            forcePrimary,
+            filter,
+            subjId,
+            taskName,
+            deserializePortable,
+            prj != null ? prj.expiry() : null);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 6c2fa8c..6f71d3c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -21,6 +21,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 
@@ -164,7 +165,15 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
     @Override public IgniteFuture<Object> readThroughAllAsync(Collection<? extends K> keys, boolean reload,
         GridCacheTxEx<K, V> tx, IgnitePredicate<GridCacheEntry<K, V>>[] filter, @Nullable UUID subjId, String taskName,
         IgniteBiInClosure<K, V> vis) {
-        return (IgniteFuture)loadAsync(tx, keys, reload, false, filter, subjId, taskName, true);
+        return (IgniteFuture)loadAsync(tx,
+            keys,
+            reload,
+            false,
+            filter,
+            subjId,
+            taskName,
+            true,
+            null);
     }
 
     /** {@inheritDoc} */
@@ -246,6 +255,10 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
      * @param reload Reload flag.
      * @param forcePrimary Force primary flag.
      * @param filter Filter.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
      * @return Loaded values.
      */
     public IgniteFuture<Map<K, V>> loadAsync(@Nullable GridCacheTxEx tx,
@@ -255,7 +268,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
         @Nullable UUID subjId,
         String taskName,
-        boolean deserializePortable) {
+        boolean deserializePortable,
+        @Nullable ExpiryPolicy expiryPlc) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
 
@@ -264,7 +278,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
 
         GridCacheTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (GridCacheTxLocalEx<K, V>)tx : null;
 
-        // TODO IGNITE-41.
+        final GridCacheAccessExpiryPolicy expiry =
+            GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
 
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
             keys,
@@ -275,7 +290,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
             subjId,
             taskName,
             deserializePortable,
-            null);
+            expiry);
 
         // init() will register future for responses if future has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java
index 7759f49..967496b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -312,8 +312,15 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
     /** {@inheritDoc} */
     @Override protected V readThrough(GridCacheTxEx<K, V> tx, K key, boolean reload,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter, UUID subjId, String taskName) throws IgniteCheckedException {
-        return cctx.near().loadAsync(tx, F.asList(key), reload, /*force primary*/false, filter, subjId, taskName, true).
-            get().get(key);
+        return cctx.near().loadAsync(tx,
+            F.asList(key),
+            reload,
+            /*force primary*/false,
+            filter,
+            subjId,
+            taskName,
+            true,
+            null).get().get(key);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
index b5d5e29..ea8450a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -91,7 +91,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
     private boolean deserializePortable;
 
     /** Expiry policy. */
-    private GridCacheAccessExpiryPolicy expiryPlc;
+    private GridCacheExpiryPolicy expiryPlc;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -123,7 +123,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
-        @Nullable GridCacheAccessExpiryPolicy expiryPlc
+        @Nullable GridCacheExpiryPolicy expiryPlc
     ) {
         super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
 
@@ -238,6 +238,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
             if (trackable)
                 cctx.mvcc().removeFuture(this);
 
+            cache().dht().sendTtlUpdateRequest(expiryPlc);
+
             return true;
         }
 
@@ -366,7 +368,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     filters,
                     subjId,
                     taskName == null ? 0 : taskName.hashCode(),
-                    expiryPlc != null ? expiryPlc.ttl() : -1L);
+                    expiryPlc != null ? expiryPlc.forAccess() : -1L);
 
                 add(fut); // Append new future.
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
index c8d90e2..416d193 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -108,9 +108,19 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
             });
         }
 
-        subjId = ctx.subjectIdPerCall(subjId);
+        GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
 
-        return loadAsync(null, keys, false, forcePrimary, filter, subjId, taskName, deserializePortable);
+        subjId = ctx.subjectIdPerCall(subjId, prj);
+
+        return loadAsync(null,
+            keys,
+            false,
+            forcePrimary,
+            filter,
+            subjId,
+            taskName,
+            deserializePortable,
+            prj != null ? prj.expiry() : null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index d901311..6f2b4e4 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -257,7 +257,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (GridCacheReturn<V>)updateAllInternal(DELETE,
             Collections.singleton(key),
             null,
-            null,
+            expiryPerCall(),
             true,
             true,
             ctx.equalsPeekArray(val),
@@ -384,7 +384,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (V)updateAllInternal(DELETE,
             Collections.singleton(key),
             null,
-            null,
+            expiryPerCall(),
             true,
             false,
             filter,
@@ -410,7 +410,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         updateAllInternal(DELETE,
             keys,
             null,
-            null,
+            expiryPerCall(),
             false,
             false,
             filter,
@@ -437,7 +437,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (Boolean)updateAllInternal(DELETE,
             Collections.singleton(key),
             null,
-            null,
+            expiryPerCall(),
             false,
             false,
             filter,
@@ -465,7 +465,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (Boolean)updateAllInternal(DELETE,
             Collections.singleton(key),
             null,
-            null,
+            expiryPerCall(),
             false,
             false,
             ctx.equalsPeekArray(val),
@@ -686,10 +686,14 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
     ) {
         final GridCacheOperation op = transformMap != null ? TRANSFORM : UPDATE;
+
         final Collection<? extends K> keys =
             map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : null;
+
         final Collection<?> vals = map != null ? map.values() : transformMap != null ? transformMap.values() : null;
+
         final boolean storeEnabled = ctx.isStoreEnabled();
+
         final ExpiryPolicy expiry = expiryPerCall();
 
         return asyncOp(new Callable<Object>() {
@@ -723,12 +727,14 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
     ) {
         final boolean storeEnabled = ctx.isStoreEnabled();
 
+        final ExpiryPolicy expiryPlc = expiryPerCall();
+
         return asyncOp(new Callable<Object>() {
             @Override public Object call() throws Exception {
                 return updateAllInternal(DELETE,
                     keys,
                     null,
-                    null,
+                    expiryPlc,
                     retval,
                     rawRetval,
                     filter,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
index b2ae55b..b238600 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
@@ -45,7 +45,7 @@ public class GridTcpCommunicationMessageFactory {
     private static final Map<Byte, GridTcpCommunicationMessageProducer> CUSTOM = new ConcurrentHashMap8<>();
 
     /** */
-    public static final int MAX_COMMON_TYPE = 81;
+    public static final int MAX_COMMON_TYPE = 100;
 
     static {
         registerCommon(new GridTcpCommunicationMessageProducer() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 6c17203..326b232 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -142,6 +142,12 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         }
 
         accessGetAll();
+
+        for (final Integer key : keys()) {
+            log.info("Test filter access [key=" + key + ']');
+
+            filterAccessRemove(key);
+        }
     }
 
     /**
@@ -155,7 +161,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         checkTtl(key, 60_000L);
 
-        assertEquals((Integer)1, cache.get(key));
+        assertEquals((Integer) 1, cache.get(key));
 
         checkTtl(key, 62_000L, true);
 
@@ -167,6 +173,22 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
     }
 
     /**
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void filterAccessRemove(Integer key) throws Exception {
+        IgniteCache<Integer, Integer> cache = jcache();
+
+        cache.put(key, 1);
+
+        checkTtl(key, 60_000L);
+
+        assertFalse(cache.remove(key, 2));
+
+        checkTtl(key, 62_000L, true);
+    }
+
+    /**
      * @throws Exception If failed.
      */
     private void accessGetAll() throws Exception {
@@ -179,10 +201,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         cache.removeAll(vals.keySet());
 
-        for (Map.Entry<Integer, Integer> e : vals.entrySet())
-            cache.put(e.getKey(), e.getValue());
-
-        //cache.putAll(vals);
+        cache.putAll(vals);
 
         for (Integer key : vals.keySet())
             checkTtl(key, 60_000L);
@@ -456,41 +475,52 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         if (cacheMode() != PARTITIONED)
             return;
 
-        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
         nearCache = true;
 
-        startGrids();
+        testCreateUpdate();
+
+        nearReaderUpdate();
 
-        Integer key = nearKey(cache(0));
+        nearPutAll();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void nearReaderUpdate() throws Exception {
+        log.info("Test near reader update.");
+
+        Integer key = nearKeys(cache(0), 1, 500_000).get(0);
 
-        IgniteCache<Integer, Integer> jcache0 = jcache(0);
+        IgniteCache<Integer, Integer> cache0 = jcache(0);
 
-        jcache0.put(key, 1);
+        assertEquals(NEAR_PARTITIONED, cache(0).configuration().getDistributionMode());
+
+        cache0.put(key, 1);
 
         checkTtl(key, 60_000L);
 
-        IgniteCache<Integer, Integer> jcache1 = jcache(1);
+        IgniteCache<Integer, Integer> cache1 = jcache(1);
 
         // Update from another node.
-        jcache1.put(key, 2);
+        cache1.put(key, 2);
 
         checkTtl(key, 61_000L);
 
         // Update from another node with provided TTL.
-        jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 3);
+        cache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 3);
 
         checkTtl(key, 1000L);
 
         waitExpired(key);
 
         // Try create again.
-        jcache0.put(key, 1);
+        cache0.put(key, 1);
 
         checkTtl(key, 60_000L);
 
         // Update from near node with provided TTL.
-        jcache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2);
+        cache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2);
 
         checkTtl(key, 1100L);
 
@@ -500,38 +530,31 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
     /**
      * @throws Exception If failed.
      */
-    public void testNearPutAll() throws Exception {
-        if (cacheMode() != PARTITIONED)
-            return;
-
-        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
-        nearCache = true;
-
-        startGrids();
-
+    private void nearPutAll() throws Exception {
         Map<Integer, Integer> vals = new HashMap<>();
 
         for (int i = 0; i < 1000; i++)
             vals.put(i, i);
 
-        IgniteCache<Integer, Integer> jcache0 = jcache(0);
+        IgniteCache<Integer, Integer> cache0 = jcache(0);
 
-        jcache0.putAll(vals);
+        cache0.removeAll(vals.keySet());
+
+        cache0.putAll(vals);
 
         for (Integer key : vals.keySet())
             checkTtl(key, 60_000L);
 
-        IgniteCache<Integer, Integer> jcache1 = jcache(1);
+        IgniteCache<Integer, Integer> cache1 = jcache(1);
 
         // Update from another node.
-        jcache1.putAll(vals);
+        cache1.putAll(vals);
 
         for (Integer key : vals.keySet())
             checkTtl(key, 61_000L);
 
         // Update from another node with provided TTL.
-        jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals);
+        cache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals);
 
         for (Integer key : vals.keySet())
             checkTtl(key, 1000L);
@@ -539,10 +562,10 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         waitExpired(vals.keySet());
 
         // Try create again.
-        jcache0.putAll(vals);
+        cache0.putAll(vals);
 
         // Update from near node with provided TTL.
-        jcache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals);
+        cache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals);
 
         for (Integer key : vals.keySet())
             checkTtl(key, 1101L);
@@ -551,6 +574,36 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testNearAccess() throws Exception {
+        if (cacheMode() != PARTITIONED)
+            return;
+
+        nearCache = true;
+
+        testAccess();
+
+        Integer key = primaryKeys(cache(0), 1, 500_000).get(0);
+
+        IgniteCache<Integer, Integer> cache0 = jcache(0);
+
+        cache0.put(key, 1);
+
+        checkTtl(key, 60_000L);
+
+        assertEquals(1, jcache(1).get(key));
+
+        checkTtl(key, 62_000L, true);
+
+        assertEquals(1, jcache(2).withExpiryPolicy(new TestPolicy(1100L, 1200L, 1000L)).get(key));
+
+        checkTtl(key, 1000L, true);
+
+        waitExpired(key);
+    }
+
+    /**
      * @return Test keys.
      * @throws Exception If failed.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java
deleted file mode 100644
index a6b3f8f..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
-*  __  ____/___________(_)______  /__  ____/______ ____(_)_______
-*  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
-*  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
-*  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
-*/
-
-package org.gridgain.grid.kernal.processors.cache;
-
-import org.apache.ignite.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.store.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.testframework.*;
-
-/**
- * Entry time-to-live abstract test.
- */
-public abstract class GridCacheAbstractTtlSelfTest extends GridCacheAbstractSelfTest {
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridCacheStore<?, ?> cacheStore() {
-        return null;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testGetExpired() throws Exception {
-        final GridCache<String, Integer> c = cache();
-
-        final String key = "1";
-
-        int ttl = 500;
-
-        GridCacheEntry<String, Integer> entry = c.entry(key);
-
-        entry.timeToLive(ttl);
-
-        entry.setValue(1);
-
-        checkKeyIsRetired(key, ttl);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testGetExpiredTx() throws Exception {
-        GridCache<String, Integer> c = cache();
-
-        String key = "1";
-        int ttl = 500;
-
-        try (GridCacheTx tx = c.txStart()) {
-            GridCacheEntry<String, Integer> entry = c.entry(key);
-
-            entry.timeToLive(ttl);
-
-            entry.setValue(1);
-
-            tx.commit();
-        }
-
-        checkKeyIsRetired(key, ttl);
-    }
-
-    /**
-     * Checks if the given cache has entry with the given key with some timeout based on the given TTL.
-     *
-     * @param key Key to be checked.
-     * @param ttl Base value for timeout before checking starts.
-     * @throws Exception If failed
-     */
-    private void checkKeyIsRetired(final String key, int ttl) throws Exception {
-        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
-            @Override public boolean applyx() throws IgniteCheckedException {
-                for (int i = 0; i < gridCount(); i++) {
-                    if (cache(i).get(key) != null) {
-                        info("Key is still in cache of grid " + i);
-
-                        return false;
-                    }
-                }
-
-                return true;
-            }
-        }, ttl * 4));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
index 6a21fe7..a7b970d 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
@@ -403,7 +403,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
         Object transformClo,
         String taskName,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable GridCacheAccessExpiryPolicy expiryPlc) {
+        @Nullable GridCacheExpiryPolicy expiryPlc) {
         return val;
     }
 
@@ -427,7 +427,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
         @Nullable Object writeObj,
         boolean writeThrough,
         boolean retval,
-        ExpiryPolicy expiryPlc,
+        @Nullable ExpiryPolicy expiryPlc,
         boolean evt,
         boolean metrics,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -448,7 +448,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
         @Nullable byte[] valBytes,
         boolean writeThrough,
         boolean retval,
-        ExpiryPolicy expiryPlc,
+        @Nullable GridCacheExpiryPolicy expiryPlc,
         boolean evt,
         boolean metrics,
         boolean primary,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java
deleted file mode 100644
index d74c04e..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
-*  __  ____/___________(_)______  /__  ____/______ ____(_)_______
-*  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
-*  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
-*  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
-*/
-
-package org.gridgain.grid.kernal.processors.cache.distributed.near;
-
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-
-import static org.gridgain.grid.cache.GridCacheMode.*;
-
-/**
- * Entry time-to-live test for partitioned cache.
- */
-public class GridCachePartitionedTtlSelfTest extends GridCacheAbstractTtlSelfTest {
-    /** {@inheritDoc} */
-    @Override protected GridCacheMode cacheMode() {
-        return PARTITIONED;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java
deleted file mode 100644
index 1c71c4a..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
-*  __  ____/___________(_)______  /__  ____/______ ____(_)_______
-*  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
-*  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
-*  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
-*/
-
-package org.gridgain.grid.kernal.processors.cache.distributed.replicated;
-
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import static org.gridgain.grid.cache.GridCacheMode.*;
-
-/**
- * Entry time-to-live test for replicated cache.
- */
-public class GridCacheReplicatedTtlSelfTest extends GridCacheAbstractTtlSelfTest {
-    /** {@inheritDoc} */
-    @Override protected GridCacheMode cacheMode() {
-        return REPLICATED;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java
deleted file mode 100644
index 9c599c1..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
-*  __  ____/___________(_)______  /__  ____/______ ____(_)_______
-*  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
-*  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
-*  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
-*/
-
-package org.gridgain.grid.kernal.processors.cache.local;
-
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-
-import static org.gridgain.grid.cache.GridCacheMode.*;
-
-/**
- * Entry time-to-live test for local cache.
- */
-public class GridCacheLocalTtlSelfTest extends GridCacheAbstractTtlSelfTest {
-    /** {@inheritDoc} */
-    @Override protected GridCacheMode cacheMode() {
-        return LOCAL;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
index e958f77..94dd70b 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
@@ -149,7 +149,6 @@ public class GridDataGridTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridCachePartitionedTxSingleThreadedSelfTest.class));
         suite.addTest(new TestSuite(GridCacheColocatedTxSingleThreadedSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedTxTimeoutSelfTest.class));
-        suite.addTest(new TestSuite(GridCachePartitionedTtlSelfTest.class));
         suite.addTest(new TestSuite(GridCacheFinishPartitionsSelfTest.class));
         suite.addTest(new TestSuite(GridCacheDhtEntrySelfTest.class));
         suite.addTest(new TestSuite(GridCacheDhtInternalEntrySelfTest.class));
@@ -242,7 +241,6 @@ public class GridDataGridTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheReplicatedEvictionEventSelfTest.class);
         // TODO: GG-7569.
         // suite.addTestSuite(GridCacheReplicatedTxMultiThreadedSelfTest.class);
-        suite.addTestSuite(GridCacheReplicatedTtlSelfTest.class);
         suite.addTestSuite(GridCacheReplicatedPreloadEventsSelfTest.class);
         suite.addTestSuite(GridCacheReplicatedPreloadStartStopEventsSelfTest.class);
         // TODO: GG-7434

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
index 2883215..94bd4fb 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
@@ -32,6 +32,7 @@ import org.gridgain.testframework.junits.common.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -286,17 +287,10 @@ public abstract class GridCacheAbstractQuerySelfTest extends GridCommonAbstractT
      * @throws Exception If failed.
      */
     public void testExpiration() throws Exception {
-        GridCache<String, Integer> cache = ignite.cache(null);
-
-        GridCacheEntry<String, Integer> entry = cache.entry("key1");
-
-        assert entry != null;
+        ignite.jcache(null).
+            withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 1000))).put("key1", 1);
 
-        entry.timeToLive(1000);
-
-        entry.set(1);
-
-        assert entry.isCached();
+        GridCache<String, Integer> cache = ignite.cache(null);
 
         GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "1=1");