You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/12/22 22:06:46 UTC
[ignite] branch master updated: IGNITE-13867 Fixed an issue related
to erroneous sending TTL update requests. Fixes #8583
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a4217d3 IGNITE-13867 Fixed an issue related to erroneous sending TTL update requests. Fixes #8583
a4217d3 is described below
commit a4217d3963f03977697183261afd055eb58057f3
Author: Alexander Lapin <la...@gmail.com>
AuthorDate: Wed Dec 23 01:06:04 2020 +0300
IGNITE-13867 Fixed an issue related to erroneous sending TTL update requests. Fixes #8583
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../cache/distributed/dht/GridDhtCacheAdapter.java | 119 +++++++++++++++++++--
.../IgniteCacheExpiryPolicyAbstractTest.java | 111 ++++++++++++++++++-
2 files changed, 215 insertions(+), 15 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 4f3650c..b52ec45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -387,11 +387,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- ctx.io().addCacheHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest req) {
- processTtlUpdateRequest(req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class,
+ (CI2<UUID, GridCacheTtlUpdateRequest>)this::processTtlUpdateRequest);
ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
@@ -1019,12 +1016,13 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion();
for (Map.Entry<KeyCacheObject, GridCacheVersion> e : entries.entrySet()) {
- List<ClusterNode> nodes = ctx.affinity().nodesByKey(e.getKey(), topVer);
+ ClusterNode primaryNode = ctx.affinity().primaryByKey(e.getKey(), topVer);
- for (int i = 0; i < nodes.size(); i++) {
- ClusterNode node = nodes.get(i);
+ if (primaryNode.isLocal()) {
+ Collection<ClusterNode> nodes = ctx.affinity().backupsByKey(e.getKey(), topVer);
- if (!node.isLocal()) {
+ for (Iterator<ClusterNode> nodesIter = nodes.iterator(); nodesIter.hasNext(); ) {
+ ClusterNode node = nodesIter.next();
GridCacheTtlUpdateRequest req = reqMap.get(node);
if (req == null) {
@@ -1036,6 +1034,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
req.addEntry(e.getKey(), e.getValue());
}
}
+ else {
+ GridCacheTtlUpdateRequest req = reqMap.get(primaryNode);
+
+ if (req == null) {
+ reqMap.put(primaryNode, req = new GridCacheTtlUpdateRequest(ctx.cacheId(),
+ topVer,
+ expiryPlc.forAccess()));
+ }
+
+ req.addEntry(e.getKey(), e.getValue());
+ }
}
Map<UUID, Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>>> rdrs = expiryPlc.readers();
@@ -1080,9 +1089,97 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
/**
+ * @param srcNodeId The Id of a node that sends original ttl request.
+ * @param incomingReq Original ttl request.
+ */
+ private void sendTtlUpdateRequest(UUID srcNodeId, GridCacheTtlUpdateRequest incomingReq) {
+ ctx.closures().runLocalSafe(new Runnable() {
+ @SuppressWarnings({"ForLoopReplaceableByForEach"})
+ @Override public void run() {
+ Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>();
+
+ for (int i = 0; i < incomingReq.keys().size(); i++) {
+ KeyCacheObject key = incomingReq.keys().get(i);
+
+ // It's only required to broadcast ttl update requests if we are on primary node for given key.
+ if (!ctx.affinity().primaryByKey(key, incomingReq.topologyVersion()).isLocal())
+ continue;
+
+ Collection<ClusterNode> nodes = ctx.affinity().backupsByKey(key, incomingReq.topologyVersion());
+
+ for (Iterator<ClusterNode> nodesIter = nodes.iterator(); nodesIter.hasNext(); ) {
+ ClusterNode node = nodesIter.next();
+
+ // There's no need to send and update ttl request to the node that send us the initial
+ // ttl update request.
+ if (node.id().equals(srcNodeId))
+ continue;
+
+ GridCacheTtlUpdateRequest req = reqMap.get(node);
+
+ if (req == null) {
+ reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(),
+ incomingReq.topologyVersion(),
+ incomingReq.ttl()));
+ }
+
+ req.addEntry(key, incomingReq.version(i));
+ }
+
+ GridDhtCacheEntry entry = ctx.dht().entryExx(key, incomingReq.topologyVersion());
+
+ Collection<UUID> readers = null;
+
+ try {
+ readers = entry.readers();
+ }
+ catch (GridCacheEntryRemovedException e) {
+ U.error(log, "Failed to send TTL update request.", e);
+ }
+
+ for (UUID reader : readers) {
+ // There's no need to send and update ttl request to the node that send us the initial
+ // ttl update request.
+ if (reader.equals(srcNodeId))
+ continue;
+
+ ClusterNode node = ctx.node(reader);
+
+ if (node != null) {
+ GridCacheTtlUpdateRequest req = reqMap.get(node);
+
+ if (req == null) {
+ reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(),
+ incomingReq.topologyVersion(),
+ incomingReq.ttl()));
+ }
+
+ req.addNearEntry(key, incomingReq.version(i));
+ }
+ }
+ }
+
+ for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest> req : reqMap.entrySet()) {
+ try {
+ ctx.io().send(req.getKey(), req.getValue(), ctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ if (e instanceof ClusterTopologyCheckedException) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send TTC update request, node left: " + req.getKey());
+ }
+ else
+ U.error(log, "Failed to send TTL update request.", e);
+ }
+ }
+ }
+ });
+ }
+
+ /**
* @param req Request.
*/
- private void processTtlUpdateRequest(GridCacheTtlUpdateRequest req) {
+ private void processTtlUpdateRequest(UUID srcNodeId, GridCacheTtlUpdateRequest req) {
if (req.keys() != null)
updateTtl(this, req.keys(), req.versions(), req.ttl());
@@ -1093,6 +1190,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl());
}
+
+ sendTtlUpdateRequest(srcNodeId, req);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index c0407ac..dd4776a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -969,6 +969,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
/**
* @throws Exception If failed.
*/
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-305")
@Test
public void testNearAccess() throws Exception {
if (cacheMode() != PARTITIONED)
@@ -988,9 +989,62 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
assertEquals(1, jcache(1).get(key));
+ assertEquals(1, jcache(2).withExpiryPolicy(
+ new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key));
+
+ checkTtl(key, TTL_FOR_EXPIRE, true);
+
+ waitExpired(key);
+
+ // Test reader update on get.
+
+ key = nearKeys(jcache(0), 1, 600_000).get(0);
+
+ cache0.put(key, 1);
+
+ checkTtl(key, 60_000L);
+
+ IgniteCache<Object, Object> cache =
+ grid(0).affinity(DEFAULT_CACHE_NAME).isPrimary(
+ grid(1).localNode(), key) ? jcache(1) : jcache(2);
+
+ assertEquals(1, cache.get(key));
+
checkTtl(key, 62_000L, true);
- assertEquals(1, jcache(2).withExpiryPolicy(new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key));
+ }
+
+ /**
+ * Given test should be removed after IGNITE-305 will be fixed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNearAccessSimplified() throws Exception {
+ if (cacheMode() != PARTITIONED)
+ return;
+
+ nearCache = true;
+
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, 62_000L));
+
+ startGrids();
+
+ Integer key = primaryKeys(jcache(0), 1, 500_000).get(0);
+
+ IgniteCache<Integer, Integer> cache0 = jcache(0);
+
+ cache0.put(key, 1);
+
+ checkTtl(key, 60_000L);
+
+ assertEquals(1, jcache(1).get(key));
+
+ // Small delay is added in order to prevent race based on IGNITE-305.
+ Thread.sleep(500);
+
+ assertEquals(1, jcache(2).withExpiryPolicy(
+ new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key));
checkTtl(key, TTL_FOR_EXPIRE, true);
@@ -1005,7 +1059,8 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
checkTtl(key, 60_000L);
IgniteCache<Object, Object> cache =
- grid(0).affinity(DEFAULT_CACHE_NAME).isPrimary(grid(1).localNode(), key) ? jcache(1) : jcache(2);
+ grid(0).affinity(DEFAULT_CACHE_NAME).isPrimary(
+ grid(1).localNode(), key) ? jcache(1) : jcache(2);
assertEquals(1, cache.get(key));
@@ -1013,6 +1068,50 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}
/**
+ * Given test should be removed after IGNITE-305 will be fixed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNearAccessGetAllTtlSimplified() throws Exception {
+ if (cacheMode() != PARTITIONED)
+ return;
+
+ nearCache = true;
+
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, 62_000L));
+
+ startGrids();
+
+ Map<Integer, Integer> vals = new HashMap<>();
+
+ for (int i = 1; i < 5; i++)
+ vals.put(i, i);
+
+ IgniteCache<Integer, Integer> cache0 = jcache(0);
+
+ cache0.removeAll(vals.keySet());
+
+ cache0.putAll(vals);
+
+ jcache(1).getAll(vals.keySet());
+
+ // Small delay is added in order to prevent race based on IGNITE-305.
+ Thread.sleep(500);
+
+ jcache(2).withExpiryPolicy(
+ new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).getAll(vals.keySet());
+
+ for (Integer key : vals.keySet()) {
+ info("Checking iterator key: " + key);
+
+ checkTtl(key, TTL_FOR_EXPIRE, true);
+ }
+
+ waitExpired(vals.keySet());
+ }
+
+ /**
* Put entry to server node and check how its expires in client NearCache.
*
* @throws Exception If failed.
@@ -1172,7 +1271,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
", backup=" + affinity(cache).isBackup(node, key) + ']');
}
- assertNull("Unexpected non-null value for grid " + i, val);
+ assertNull("Unexpected non-null value for grid " + i + " key: " + key, val);
}
}
@@ -1230,8 +1329,10 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
found = true;
- if (ttl > 0)
- assertTrue(e.expireTime() > 0);
+ if (ttl > 0) {
+ assertTrue("Unexpected expiration time, key: " + key + " expirationtime: "
+ + e.expireTime(), e.expireTime() > 0);
+ }
else
assertEquals(0, e.expireTime());
}