You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/19 11:04:19 UTC

[11/11] incubator-ignite git commit: # ignite-41

# ignite-41


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/784fadaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/784fadaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/784fadaf

Branch: refs/heads/ignite-41
Commit: 784fadaf7f09a9b90c29dc40fa8e4fadda57b554
Parents: 3e62879
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 19 12:22:00 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 19 13:02:03 2014 +0300

----------------------------------------------------------------------
 .../cache/GridCacheAccessExpiryPolicy.java      |  46 ++--
 .../distributed/GridCacheTtlUpdateRequest.java  | 213 +++++++++++++++++--
 .../distributed/dht/GridDhtCacheAdapter.java    |  80 ++++++-
 .../dht/atomic/GridDhtAtomicCache.java          |   6 +-
 .../IgniteCacheExpiryPolicyAbstractTest.java    |  42 ++--
 5 files changed, 326 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/784fadaf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
index 2205108..ebac13c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
@@ -25,7 +25,10 @@ public class GridCacheAccessExpiryPolicy implements GridCacheExpiryPolicy {
     private final long accessTtl;
 
     /** */
-    private volatile Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries;
+    private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries;
+
+    /** */
+    private Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> rdrsMap;
 
     /**
      * @param expiryPlc Expiry policy.
@@ -70,11 +73,12 @@ public class GridCacheAccessExpiryPolicy implements GridCacheExpiryPolicy {
     /**
      *
      */
-    public void reset() {
-        Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries;
+    public synchronized void reset() {
+        if (entries != null)
+            entries.clear();
 
-        if (entries0 != null)
-            entries0.clear();
+        if (rdrsMap != null)
+            rdrsMap.clear();
     }
 
     /**
@@ -83,34 +87,42 @@ public class GridCacheAccessExpiryPolicy implements GridCacheExpiryPolicy {
      * @param ver Entry version.
      */
     @SuppressWarnings("unchecked")
-    @Override public void onAccessUpdated(Object key,
+    @Override public synchronized void onAccessUpdated(Object key,
         byte[] keyBytes,
         GridCacheVersion ver,
         @Nullable Collection<UUID> rdrs) {
-        Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries;
+        if (entries == null)
+            entries = new HashMap<>();
+
+        IgniteBiTuple<byte[], GridCacheVersion> t = new IgniteBiTuple<>(keyBytes, ver);
+
+        entries.put(key, t);
 
-        if (entries0 == null) {
-            synchronized (this) {
-                entries0 = entries;
+        if (rdrs != null && !rdrs.isEmpty()) {
+            if (rdrsMap == null)
+                rdrsMap = new HashMap<>();
 
-                if (entries0 == null)
-                    entries0 = entries = new ConcurrentHashMap8<>();
+            for (UUID nodeId : rdrs) {
+                Collection<IgniteBiTuple<byte[], GridCacheVersion>> col = rdrsMap.get(nodeId);
+
+                if (col == null)
+                    rdrsMap.put(nodeId, col = new ArrayList<>());
+
+                col.add(t);
             }
         }
-
-        entries0.put(key, new IgniteBiTuple<>(keyBytes, ver));
     }
 
     /**
      * @return TTL update request.
      */
-    @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
+    @Nullable @Override public synchronized Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
         return entries;
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers() {
-        return null;
+    @Nullable @Override public synchronized Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers() {
+        return rdrsMap;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/784fadaf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index b2e9141..ce597bd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -23,30 +23,38 @@ import java.util.*;
  *
  */
 public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
-    /** */
-    @GridDirectCollection(byte[].class)
-    private List<byte[]> keysBytes;
-
-    /** Entry keys. */
+    /** Entries keys. */
     @GridToStringInclude
     @GridDirectTransient
     private List<K> keys;
 
-    /** Entry versions. */
+    /** Keys bytes. */
+    @GridDirectCollection(byte[].class)
+    private List<byte[]> keysBytes;
+
+    /** Entries versions. */
     @GridDirectCollection(GridCacheVersion.class)
     private List<GridCacheVersion> vers;
 
-    /** */
+    /** Near entries keys. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private List<K> nearKeys;
+
+    /** Near entries bytes. */
     @GridDirectCollection(byte[].class)
     private List<byte[]> nearKeysBytes;
 
-    /** Versions for near entries. */
+    /** Near entries versions. */
     @GridDirectCollection(GridCacheVersion.class)
     private List<GridCacheVersion> nearVers;
 
     /** New TTL. */
     private long ttl;
 
+    /** Topology version. */
+    private long topVer;
+
     /**
      * Required empty constructor.
      */
@@ -55,15 +63,24 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
     }
 
     /**
+     * @param topVer Topology version.
      * @param ttl TTL.
      */
-    public GridCacheTtlUpdateRequest(long ttl) {
+    public GridCacheTtlUpdateRequest(long topVer, long ttl) {
         assert ttl >= 0 : ttl;
 
+        this.topVer = topVer;
         this.ttl = ttl;
     }
 
     /**
+     * @return Topology version.
+     */
+    public long topologyVersion() {
+        return topVer;
+    }
+
+    /**
      * @return TTL.
      */
     public long ttl() {
@@ -87,6 +104,22 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
     }
 
     /**
+     * @param keyBytes Key bytes.
+     * @param ver Version.
+     */
+    public void addNearEntry(byte[] keyBytes, GridCacheVersion ver) {
+        if (nearKeysBytes == null) {
+            nearKeysBytes = new ArrayList<>();
+
+            nearVers = new ArrayList<>();
+        }
+
+        nearKeysBytes.add(keyBytes);
+
+        nearVers.add(ver);
+    }
+
+    /**
      * @return Keys.
      */
     public List<K> keys() {
@@ -94,13 +127,10 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
     }
 
     /**
-     * @param idx Entry index.
-     * @return Key.
+     * @return Versions.
      */
-    public K key(int idx) {
-        assert idx >= 0 && idx < keys.size() : idx;
-
-        return keys.get(idx);
+    public List<GridCacheVersion > versions() {
+        return vers;
     }
 
     /**
@@ -113,6 +143,20 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
         return vers.get(idx);
     }
 
+    /**
+     * @return Keys for near cache.
+     */
+    public List<K> nearKeys() {
+        return nearKeys;
+    }
+
+    /**
+     * @return Versions for near cache entries.
+     */
+    public List<GridCacheVersion > nearVersions() {
+        return nearVers;
+    }
+
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr)
         throws IgniteCheckedException {
@@ -120,6 +164,9 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
 
         if (keys == null && keysBytes != null)
             keys = unmarshalCollection(keysBytes, ctx, ldr);
+
+        if (nearKeys == null && nearKeysBytes != null)
+            nearKeys = unmarshalCollection(nearKeysBytes, ctx, ldr);
     }
 
     /** {@inheritDoc} */
@@ -180,12 +227,72 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
                 commState.idx++;
 
             case 4:
+                if (nearKeysBytes != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(nearKeysBytes.size()))
+                            return false;
+
+                        commState.it = nearKeysBytes.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putByteArray((byte[])commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 5:
+                if (nearVers != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(nearVers.size()))
+                            return false;
+
+                        commState.it = nearVers.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putCacheVersion((GridCacheVersion)commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 6:
+                if (!commState.putLong(topVer))
+                    return false;
+
+                commState.idx++;
+
+            case 7:
                 if (!commState.putLong(ttl))
                     return false;
 
                 commState.idx++;
 
-            case 5:
+            case 8:
                 if (vers != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(vers.size()))
@@ -255,6 +362,72 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
                 commState.idx++;
 
             case 4:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (nearKeysBytes == null)
+                        nearKeysBytes = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; i++) {
+                        byte[] _val = commState.getByteArray();
+
+                        if (_val == BYTE_ARR_NOT_READ)
+                            return false;
+
+                        nearKeysBytes.add((byte[])_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+            case 5:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (nearVers == null)
+                        nearVers = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; i++) {
+                        GridCacheVersion _val = commState.getCacheVersion();
+
+                        if (_val == CACHE_VER_NOT_READ)
+                            return false;
+
+                        nearVers.add((GridCacheVersion)_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+            case 6:
+                if (buf.remaining() < 8)
+                    return false;
+
+                topVer = commState.getLong();
+
+                commState.idx++;
+
+            case 7:
                 if (buf.remaining() < 8)
                     return false;
 
@@ -262,7 +435,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
 
                 commState.idx++;
 
-            case 5:
+            case 8:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -302,10 +475,14 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
 
         GridCacheTtlUpdateRequest _clone = (GridCacheTtlUpdateRequest)_msg;
 
-        _clone.keysBytes = keysBytes;
         _clone.keys = keys;
+        _clone.keysBytes = keysBytes;
         _clone.vers = vers;
+        _clone.nearKeys = nearKeys;
+        _clone.nearKeysBytes = nearKeysBytes;
+        _clone.nearVers = nearVers;
         _clone.ttl = ttl;
+        _clone.topVer = topVer;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/784fadaf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index e67c6dd..41e56c5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -622,7 +622,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                                 GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node);
 
                                 if (req == null) {
-                                    reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(expiryPlc.forAccess()));
+                                    reqMap.put(node,
+                                        req = new GridCacheTtlUpdateRequest<>(topVer, expiryPlc.forAccess()));
 
                                     req.cacheId(ctx.cacheId());
                                 }
@@ -632,6 +633,30 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                         }
                     }
 
+                    Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> rdrs = expiryPlc.readers();
+
+                    if (rdrs != null) {
+                        assert  !rdrs.isEmpty();
+
+                        for (Map.Entry<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> e : rdrs.entrySet()) {
+                            ClusterNode node = ctx.node(e.getKey());
+
+                            if (node != null) {
+                                GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node);
+
+                                if (req == null) {
+                                    reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(topVer,
+                                        expiryPlc.forAccess()));
+
+                                    req.cacheId(ctx.cacheId());
+                                }
+
+                                for (IgniteBiTuple<byte[], GridCacheVersion> t : e.getValue())
+                                    req.addNearEntry(t.get1(), t.get2());
+                            }
+                        }
+                    }
+
                     for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest<K, V>> req : reqMap.entrySet()) {
                         try {
                             ctx.io().send(req.getKey(), req.getValue());
@@ -649,22 +674,55 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param req Request.
      */
     private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) {
-        int size = req.keys().size();
+        if (req.keys() != null)
+            updateTtl(this, req.keys(), req.versions(), req.ttl());
+
+        if (req.nearKeys() != null) {
+            GridNearCacheAdapter<K, V> near = near();
+
+            assert near != null;
+
+            updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keys Entries keys.
+     * @param vers Entries versions.
+     * @param ttl TTL.
+     */
+    private void updateTtl(GridCacheAdapter<K, V> cache,
+        List<K> keys,
+        List<GridCacheVersion> vers,
+        long ttl) {
+        assert !F.isEmpty(keys);
+        assert keys.size() == vers.size();
+
+        int size = keys.size();
+
+        boolean swap = cache.context().isSwapOrOffheapEnabled();
 
         for (int i = 0; i < size; i++) {
             try {
-                GridCacheEntryEx<K, V> entry;
+                GridCacheEntryEx<K, V> entry = null;
 
-                if (ctx.isSwapOrOffheapEnabled()) {
-                    entry = ctx.cache().entryEx(req.key(i), true);
+                try {
+                    if (swap) {
+                        entry = cache.entryEx(keys.get(i));
 
-                    entry.unswap(true, false);
-                }
-                else
-                    entry = ctx.cache().peekEx(req.key(i));
+                        entry.unswap(true, false);
+                    }
+                    else
+                        entry = cache.peekEx(keys.get(i));
 
-                if (entry != null)
-                    entry.updateTtl(req.version(i), req.ttl());
+                    if (entry != null)
+                        entry.updateTtl(vers.get(i), ttl);
+                }
+                finally {
+                    if (entry != null)
+                        cache.context().evicts().touch(entry, -1L);
+                }
             }
             catch (IgniteCheckedException e) {
                 log.error("Failed to unswap entry.", e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/784fadaf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 21afa83..d8694c6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2744,12 +2744,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             if (entries == null)
                 entries = new HashMap<>();
 
-            IgniteBiTuple t = new IgniteBiTuple<>(keyBytes, ver);
+            IgniteBiTuple<byte[], GridCacheVersion> t = new IgniteBiTuple<>(keyBytes, ver);
 
             entries.put(key, t);
 
-            if (!F.isEmpty(rdrs)) {
-                if (rdrs == null)
+            if (rdrs != null && !rdrs.isEmpty()) {
+                if (rdrsMap == null)
                     rdrsMap = new HashMap<>();
 
                 for (UUID nodeId : rdrs) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/784fadaf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index bffe6af..2d115c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -45,6 +45,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
     /** */
     private Integer lastKey = 0;
 
+    /** */
+    private static final long TTL_FOR_EXPIRE = 500L;
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         // No-op.
@@ -120,9 +123,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         checkTtl(key, 60_000L);
 
-        cache.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 1); // Update with custom.
+        cache.withExpiryPolicy(new TestPolicy(null, TTL_FOR_EXPIRE, null)).put(key, 1); // Update with custom.
 
-        checkTtl(key, 1000L);
+        checkTtl(key, TTL_FOR_EXPIRE);
 
         waitExpired(key);
     }
@@ -171,9 +174,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         checkTtl(key, 62_000L, true);
 
-        assertEquals((Integer)1, cache.withExpiryPolicy(new TestPolicy(1100L, 1200L, 1000L)).get(key));
+        assertEquals((Integer)1, cache.withExpiryPolicy(new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key));
 
-        checkTtl(key, 1000L, true);
+        checkTtl(key, TTL_FOR_EXPIRE, true);
 
         waitExpired(key);
     }
@@ -415,12 +418,12 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         tx = startTx(txConcurrency);
 
         // Update with provided TTL.
-        cache.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 2);
+        cache.withExpiryPolicy(new TestPolicy(null, TTL_FOR_EXPIRE, null)).put(key, 2);
 
         if (tx != null)
             tx.commit();
 
-        checkTtl(key, 1000L);
+        checkTtl(key, TTL_FOR_EXPIRE);
 
         waitExpired(key);
 
@@ -538,9 +541,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         checkTtl(key, 61_000L);
 
         // Update from another node with provided TTL.
-        cache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 3);
+        cache1.withExpiryPolicy(new TestPolicy(null, TTL_FOR_EXPIRE, null)).put(key, 3);
 
-        checkTtl(key, 1000L);
+        checkTtl(key, TTL_FOR_EXPIRE);
 
         waitExpired(key);
 
@@ -550,9 +553,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         checkTtl(key, 60_000L);
 
         // Update from near node with provided TTL.
-        cache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2);
+        cache0.withExpiryPolicy(new TestPolicy(null, TTL_FOR_EXPIRE + 1, null)).put(key, 2);
 
-        checkTtl(key, 1100L);
+        checkTtl(key, TTL_FOR_EXPIRE + 1);
 
         waitExpired(key);
     }
@@ -626,11 +629,26 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         checkTtl(key, 62_000L, true);
 
-        assertEquals(1, jcache(2).withExpiryPolicy(new TestPolicy(1100L, 1200L, 1000L)).get(key));
+        assertEquals(1, jcache(2).withExpiryPolicy(new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key));
 
-        checkTtl(key, 1000L, true);
+        checkTtl(key, TTL_FOR_EXPIRE, true);
 
         waitExpired(key);
+
+        // Test reader update on get.
+
+        key = nearKeys(cache(0), 1, 600_000).get(0);
+
+        cache0.put(key, 1);
+
+        checkTtl(key, 60_000L);
+
+        IgniteCache<Object, Object> cache =
+            cache(0).affinity().isPrimary(grid(1).localNode(), key) ? jcache(1) : jcache(2);
+
+        assertEquals(1, cache.get(key));
+
+        checkTtl(key, 62_000L, true);
     }
 
     /**