You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/12/22 22:06:46 UTC

[ignite] branch master updated: IGNITE-13867 Fixed an issue related to erroneous sending TTL update requests. Fixes #8583

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a4217d3  IGNITE-13867 Fixed an issue related to erroneous sending TTL update requests. Fixes #8583
a4217d3 is described below

commit a4217d3963f03977697183261afd055eb58057f3
Author: Alexander Lapin <la...@gmail.com>
AuthorDate: Wed Dec 23 01:06:04 2020 +0300

    IGNITE-13867 Fixed an issue related to erroneous sending TTL update requests. Fixes #8583
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../cache/distributed/dht/GridDhtCacheAdapter.java | 119 +++++++++++++++++++--
 .../IgniteCacheExpiryPolicyAbstractTest.java       | 111 ++++++++++++++++++-
 2 files changed, 215 insertions(+), 15 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 4f3650c..b52ec45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -387,11 +387,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        ctx.io().addCacheHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() {
-            @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest req) {
-                processTtlUpdateRequest(req);
-            }
-        });
+        ctx.io().addCacheHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class,
+            (CI2<UUID, GridCacheTtlUpdateRequest>)this::processTtlUpdateRequest);
 
         ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -1019,12 +1016,13 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                     AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion();
 
                     for (Map.Entry<KeyCacheObject, GridCacheVersion> e : entries.entrySet()) {
-                        List<ClusterNode> nodes = ctx.affinity().nodesByKey(e.getKey(), topVer);
+                        ClusterNode primaryNode = ctx.affinity().primaryByKey(e.getKey(), topVer);
 
-                        for (int i = 0; i < nodes.size(); i++) {
-                            ClusterNode node = nodes.get(i);
+                        if (primaryNode.isLocal()) {
+                            Collection<ClusterNode> nodes = ctx.affinity().backupsByKey(e.getKey(), topVer);
 
-                            if (!node.isLocal()) {
+                            for (Iterator<ClusterNode> nodesIter = nodes.iterator(); nodesIter.hasNext(); ) {
+                                ClusterNode node = nodesIter.next();
                                 GridCacheTtlUpdateRequest req = reqMap.get(node);
 
                                 if (req == null) {
@@ -1036,6 +1034,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                                 req.addEntry(e.getKey(), e.getValue());
                             }
                         }
+                        else {
+                            GridCacheTtlUpdateRequest req = reqMap.get(primaryNode);
+
+                            if (req == null) {
+                                reqMap.put(primaryNode, req = new GridCacheTtlUpdateRequest(ctx.cacheId(),
+                                    topVer,
+                                    expiryPlc.forAccess()));
+                            }
+
+                            req.addEntry(e.getKey(), e.getValue());
+                        }
                     }
 
                     Map<UUID, Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>>> rdrs = expiryPlc.readers();
@@ -1080,9 +1089,97 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /**
+     * @param srcNodeId The Id of a node that sends original ttl request.
+     * @param incomingReq Original ttl request.
+     */
+    private void sendTtlUpdateRequest(UUID srcNodeId, GridCacheTtlUpdateRequest incomingReq) {
+        ctx.closures().runLocalSafe(new Runnable() {
+            @SuppressWarnings({"ForLoopReplaceableByForEach"})
+            @Override public void run() {
+                Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>();
+
+                for (int i = 0; i < incomingReq.keys().size(); i++) {
+                    KeyCacheObject key = incomingReq.keys().get(i);
+
+                    // It's only required to broadcast ttl update requests if we are on primary node for given key.
+                    if (!ctx.affinity().primaryByKey(key, incomingReq.topologyVersion()).isLocal())
+                        continue;
+
+                    Collection<ClusterNode> nodes = ctx.affinity().backupsByKey(key, incomingReq.topologyVersion());
+
+                    for (Iterator<ClusterNode> nodesIter = nodes.iterator(); nodesIter.hasNext(); ) {
+                        ClusterNode node = nodesIter.next();
+
+                        // There's no need to send and update ttl request to the node that send us the initial
+                        // ttl update request.
+                        if (node.id().equals(srcNodeId))
+                            continue;
+
+                        GridCacheTtlUpdateRequest req = reqMap.get(node);
+
+                        if (req == null) {
+                            reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(),
+                                incomingReq.topologyVersion(),
+                                incomingReq.ttl()));
+                        }
+
+                        req.addEntry(key, incomingReq.version(i));
+                    }
+
+                    GridDhtCacheEntry entry = ctx.dht().entryExx(key, incomingReq.topologyVersion());
+
+                    Collection<UUID> readers = null;
+
+                    try {
+                        readers = entry.readers();
+                    }
+                    catch (GridCacheEntryRemovedException e) {
+                        U.error(log, "Failed to send TTL update request.", e);
+                    }
+
+                    for (UUID reader : readers) {
+                        // There's no need to send and update ttl request to the node that send us the initial
+                        // ttl update request.
+                        if (reader.equals(srcNodeId))
+                            continue;
+
+                        ClusterNode node = ctx.node(reader);
+
+                        if (node != null) {
+                            GridCacheTtlUpdateRequest req = reqMap.get(node);
+
+                            if (req == null) {
+                                reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(),
+                                    incomingReq.topologyVersion(),
+                                    incomingReq.ttl()));
+                            }
+
+                            req.addNearEntry(key, incomingReq.version(i));
+                        }
+                    }
+                }
+
+                for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest> req : reqMap.entrySet()) {
+                    try {
+                        ctx.io().send(req.getKey(), req.getValue(), ctx.ioPolicy());
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (e instanceof ClusterTopologyCheckedException) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to send TTC update request, node left: " + req.getKey());
+                        }
+                        else
+                            U.error(log, "Failed to send TTL update request.", e);
+                    }
+                }
+            }
+        });
+    }
+
+    /**
      * @param req Request.
      */
-    private void processTtlUpdateRequest(GridCacheTtlUpdateRequest req) {
+    private void processTtlUpdateRequest(UUID srcNodeId, GridCacheTtlUpdateRequest req) {
         if (req.keys() != null)
             updateTtl(this, req.keys(), req.versions(), req.ttl());
 
@@ -1093,6 +1190,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
             updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl());
         }
+
+        sendTtlUpdateRequest(srcNodeId, req);
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index c0407ac..dd4776a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -969,6 +969,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
     /**
      * @throws Exception If failed.
      */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-305")
     @Test
     public void testNearAccess() throws Exception {
         if (cacheMode() != PARTITIONED)
@@ -988,9 +989,62 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         assertEquals(1, jcache(1).get(key));
 
+        assertEquals(1, jcache(2).withExpiryPolicy(
+            new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key));
+
+        checkTtl(key, TTL_FOR_EXPIRE, true);
+
+        waitExpired(key);
+
+        // Test reader update on get.
+
+        key = nearKeys(jcache(0), 1, 600_000).get(0);
+
+        cache0.put(key, 1);
+
+        checkTtl(key, 60_000L);
+
+        IgniteCache<Object, Object> cache =
+            grid(0).affinity(DEFAULT_CACHE_NAME).isPrimary(
+                grid(1).localNode(), key) ? jcache(1) : jcache(2);
+
+        assertEquals(1, cache.get(key));
+
         checkTtl(key, 62_000L, true);
 
-        assertEquals(1, jcache(2).withExpiryPolicy(new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key));
+    }
+
+    /**
+     * Given test should be removed after IGNITE-305 will be fixed.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNearAccessSimplified() throws Exception {
+        if (cacheMode() != PARTITIONED)
+            return;
+
+        nearCache = true;
+
+        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, 62_000L));
+
+        startGrids();
+
+        Integer key = primaryKeys(jcache(0), 1, 500_000).get(0);
+
+        IgniteCache<Integer, Integer> cache0 = jcache(0);
+
+        cache0.put(key, 1);
+
+        checkTtl(key, 60_000L);
+
+        assertEquals(1, jcache(1).get(key));
+
+        // Small delay is added in order to prevent race based on IGNITE-305.
+        Thread.sleep(500);
+
+        assertEquals(1, jcache(2).withExpiryPolicy(
+            new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key));
 
         checkTtl(key, TTL_FOR_EXPIRE, true);
 
@@ -1005,7 +1059,8 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         checkTtl(key, 60_000L);
 
         IgniteCache<Object, Object> cache =
-            grid(0).affinity(DEFAULT_CACHE_NAME).isPrimary(grid(1).localNode(), key) ? jcache(1) : jcache(2);
+            grid(0).affinity(DEFAULT_CACHE_NAME).isPrimary(
+                grid(1).localNode(), key) ? jcache(1) : jcache(2);
 
         assertEquals(1, cache.get(key));
 
@@ -1013,6 +1068,50 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
     }
 
     /**
+     * Given test should be removed after IGNITE-305 will be fixed.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNearAccessGetAllTtlSimplified() throws Exception {
+        if (cacheMode() != PARTITIONED)
+            return;
+
+        nearCache = true;
+
+        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, 62_000L));
+
+        startGrids();
+
+        Map<Integer, Integer> vals = new HashMap<>();
+
+        for (int i = 1; i < 5; i++)
+            vals.put(i, i);
+
+        IgniteCache<Integer, Integer> cache0 = jcache(0);
+
+        cache0.removeAll(vals.keySet());
+
+        cache0.putAll(vals);
+
+        jcache(1).getAll(vals.keySet());
+
+        // Small delay is added in order to prevent race based on IGNITE-305.
+        Thread.sleep(500);
+
+        jcache(2).withExpiryPolicy(
+            new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).getAll(vals.keySet());
+
+        for (Integer key : vals.keySet()) {
+            info("Checking iterator key: " + key);
+
+            checkTtl(key, TTL_FOR_EXPIRE, true);
+        }
+
+        waitExpired(vals.keySet());
+    }
+
+    /**
      * Put entry to server node and check how its expires in client NearCache.
      *
      * @throws Exception If failed.
@@ -1172,7 +1271,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
                         ", backup=" + affinity(cache).isBackup(node, key) + ']');
                 }
 
-                assertNull("Unexpected non-null value for grid " + i, val);
+                assertNull("Unexpected non-null value for grid " + i + " key: " + key, val);
             }
         }
 
@@ -1230,8 +1329,10 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
                         found = true;
 
-                        if (ttl > 0)
-                            assertTrue(e.expireTime() > 0);
+                        if (ttl > 0) {
+                            assertTrue("Unexpected expiration time, key: " + key + " expirationtime: "
+                                + e.expireTime(), e.expireTime() > 0);
+                        }
                         else
                             assertEquals(0, e.expireTime());
                     }