You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/03 14:05:05 UTC

[05/14] ignite git commit: ignite-5593 Fixed exchange futures leak

ignite-5593 Fixed exchange futures leak


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce5f8e90
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce5f8e90
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce5f8e90

Branch: refs/heads/ignite-gg-12389
Commit: ce5f8e9016f088e55d13051075ca13b8d3317c06
Parents: 771288f
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Jun 30 10:34:11 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 30 10:34:11 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  2 +-
 .../CacheLateAffinityAssignmentTest.java        | 73 ++++++++++++++++++++
 2 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ce5f8e90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d997b79..cbb07f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -261,7 +261,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 exchFut = exchangeFuture(exchId, evt, cache, null, msg);
                             }
                         }
-                        else {
+                        else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery().localJoinEvent().topologyVersion()) {
                             exchangeFuture(msg.exchangeId(), null, null, null, null)
                                 .onAffinityChangeMessage(evt.eventNode(), msg);
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce5f8e90/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index db3f5cd..c0c3d70 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
@@ -1167,6 +1168,78 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Wait for rebalance, send affinity change message, but affinity already changed (new node joined).
+     *
+     * @throws Exception If failed.
+     */
+    public void testDelayAssignmentAffinityChanged2() throws Exception {
+        Ignite ignite0 = startServer(0, 1);
+
+        TestTcpDiscoverySpi discoSpi0 =
+            (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi();
+        TestRecordingCommunicationSpi commSpi0 =
+            (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+
+        startClient(1, 2);
+
+        checkAffinity(2, topVer(2, 0), true);
+
+        startServer(2, 3);
+
+        checkAffinity(3, topVer(3, 1), false);
+
+        discoSpi0.blockCustomEvent();
+
+        stopNode(2, 4);
+
+        discoSpi0.waitCustomEvent();
+
+        blockSupplySend(commSpi0, CACHE_NAME1);
+
+        final IgniteInternalFuture<?> startedFuture = multithreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startServer(3, 5);
+
+                return null;
+            }
+        }, 1, "server-starter");
+
+        Thread.sleep(2_000);
+
+        discoSpi0.stopBlock();
+
+        boolean started = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return startedFuture.isDone();
+            }
+        }, 10_000);
+
+        if (!started)
+            startedFuture.cancel();
+
+        assertTrue(started);
+
+        checkAffinity(3, topVer(5, 0), false);
+
+        checkNoExchange(3, topVer(5, 1));
+
+        commSpi0.stopBlock();
+
+        checkAffinity(3, topVer(5, 1), true);
+
+        long nodeJoinTopVer = grid(3).context().discovery().localJoinEvent().topologyVersion();
+
+        assertEquals(5, nodeJoinTopVer);
+
+        List<GridDhtPartitionsExchangeFuture> exFutures = grid(3).context().cache().context().exchange().exchangeFutures();
+
+        for (GridDhtPartitionsExchangeFuture f : exFutures) {
+            //Shouldn't contains staled futures.
+            assertTrue(f.topologyVersion().topologyVersion() >= nodeJoinTopVer);
+        }
+    }
+
+    /**
      * Wait for rebalance, cache is destroyed and created again.
      *
      * @throws Exception If failed.