You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/08/09 10:52:51 UTC

[GitHub] [ignite] Mmuzaf commented on a diff in pull request #10140: IGNITE-17316 Add pluggable affinity fucntion to the thin client partition awareness feature

Mmuzaf commented on code in PR #10140:
URL: https://github.com/apache/ignite/pull/10140#discussion_r941192742


##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java:
##########
@@ -81,42 +97,41 @@ else if (topVer.equals(lastTop.topVer)) {
      * @param cacheId Cache id.
      */
     public boolean affinityUpdateRequired(int cacheId) {
-        TopologyNodes top = lastTop.get();
-
-        if (top == null) { // Don't know current topology.
-            pendingCacheIds.add(cacheId);
-
-            return false;
-        }
-
-        ClientCacheAffinityMapping mapping = affinityMapping;
-
-        if (mapping == null) {
-            pendingCacheIds.add(cacheId);
-
-            return true;
-        }
+        ClientCacheAffinityMapping mapping = currentMapping();
 
-        if (top.topVer.compareTo(mapping.topologyVersion()) > 0) {
+        if (mapping == null || !mapping.cacheIds().contains(cacheId)) {
             pendingCacheIds.add(cacheId);
 
             return true;
         }
 
-        if (mapping.cacheIds().contains(cacheId))
-            return false;
-        else {
-            pendingCacheIds.add(cacheId);
-
-            return true;
-        }
+        return false;
     }
 
     /**
      * @param ch Payload output channel.
      */
     public void writePartitionsUpdateRequest(PayloadOutputChannel ch) {
-        ClientCacheAffinityMapping.writeRequest(ch, pendingCacheIds);
+        assert rq == null : "Previous mapping request was not properly handled: " + rq;
+
+        final Set<Integer> cacheIds;
+        long lastAccessed;
+
+        synchronized (cacheKeyMapperFactoryMap) {
+            cacheIds = new HashSet<>(pendingCacheIds);
+
+            pendingCacheIds.removeAll(cacheIds);

Review Comment:
   Fixed.



##########
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java:
##########
@@ -55,6 +63,81 @@ public void testResourcesReleasedAfterClientClosed() throws Exception {
         assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(THREAD_PREFIX) == 0, 1_000L));
     }
 
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testResourcesReleasedAfterCacheDestroyed() throws Exception {
+        int cacheId = CU.cacheId(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        startGrids(2);
+
+        initClient(getClientConfiguration(0, 1)
+            .setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
+                /** {@inheritDoc} */
+                @Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
+                    assertEquals(cacheName, PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+                    AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);
+
+                    return aff::partition;
+                }
+            }), 0, 1);
+
+        ClientCache<Object, Object> clientCache = client.cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        IgniteInternalCache<Object, Object> gridCache = grid(0).context().cache().cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+        clientCache.put(0, 0);
+        TestTcpClientChannel opCh = affinityChannel(0, gridCache);
+
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+
+        for (int i = 1; i < KEY_CNT; i++)
+            clientCache.put(i, i);
+
+        ClientCacheAffinityContext affCtx = clientAffinityContext(client);
+        AffinityTopologyVersion ver = affCtx.currentMapping().topologyVersion();
+
+        grid(0).destroyCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        awaitPartitionMapExchange();
+
+        // Cache destroyed, but mappings still exist on the client side.
+        assertEquals(opCh.serverNodeId(), affCtx.affinityNode(cacheId, Integer.valueOf(0)));
+
+        client.cache(PART_CACHE_NAME).put(1, 1);
+
+        // await mappings updated.
+        assertTrue(GridTestUtils.waitForCondition(() -> {
+            ClientCacheAffinityMapping m = affCtx.currentMapping();
+
+            if (m == null)
+                return false;
+
+            return m.topologyVersion().equals(ver.nextMinorVersion());
+        }, 5_000L));
+
+        assertNull(affCtx.currentMapping().affinityNode(cacheId, 0));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+        client.close();
+    }
+
+    /**
+     * @param clnt Ignite client.
+     * @return Client context.
+     */
+    private static ClientCacheAffinityContext clientAffinityContext(IgniteClient clnt) {
+        assertTrue(clnt instanceof TcpIgniteClient);
+
+        return GridTestUtils.getFieldValue(GridTestUtils.getFieldValue(clnt, "ch"),

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org