You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/03 14:05:05 UTC
[05/14] ignite git commit: ignite-5593 Fixed exchange futures leak
ignite-5593 Fixed exchange futures leak
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce5f8e90
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce5f8e90
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce5f8e90
Branch: refs/heads/ignite-gg-12389
Commit: ce5f8e9016f088e55d13051075ca13b8d3317c06
Parents: 771288f
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Jun 30 10:34:11 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 30 10:34:11 2017 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 2 +-
.../CacheLateAffinityAssignmentTest.java | 73 ++++++++++++++++++++
2 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce5f8e90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d997b79..cbb07f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -261,7 +261,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache, null, msg);
}
}
- else {
+ else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery().localJoinEvent().topologyVersion()) {
exchangeFuture(msg.exchangeId(), null, null, null, null)
.onAffinityChangeMessage(evt.eventNode(), msg);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce5f8e90/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index db3f5cd..c0c3d70 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
@@ -1167,6 +1168,78 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
/**
+ * Wait for rebalance, send affinity change message, but affinity already changed (new node joined).
+ *
+ * @throws Exception If failed.
+ */
+ public void testDelayAssignmentAffinityChanged2() throws Exception {
+ Ignite ignite0 = startServer(0, 1);
+
+ TestTcpDiscoverySpi discoSpi0 =
+ (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi();
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+
+ startClient(1, 2);
+
+ checkAffinity(2, topVer(2, 0), true);
+
+ startServer(2, 3);
+
+ checkAffinity(3, topVer(3, 1), false);
+
+ discoSpi0.blockCustomEvent();
+
+ stopNode(2, 4);
+
+ discoSpi0.waitCustomEvent();
+
+ blockSupplySend(commSpi0, CACHE_NAME1);
+
+ final IgniteInternalFuture<?> startedFuture = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ startServer(3, 5);
+
+ return null;
+ }
+ }, 1, "server-starter");
+
+ Thread.sleep(2_000);
+
+ discoSpi0.stopBlock();
+
+ boolean started = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return startedFuture.isDone();
+ }
+ }, 10_000);
+
+ if (!started)
+ startedFuture.cancel();
+
+ assertTrue(started);
+
+ checkAffinity(3, topVer(5, 0), false);
+
+ checkNoExchange(3, topVer(5, 1));
+
+ commSpi0.stopBlock();
+
+ checkAffinity(3, topVer(5, 1), true);
+
+ long nodeJoinTopVer = grid(3).context().discovery().localJoinEvent().topologyVersion();
+
+ assertEquals(5, nodeJoinTopVer);
+
+ List<GridDhtPartitionsExchangeFuture> exFutures = grid(3).context().cache().context().exchange().exchangeFutures();
+
+ for (GridDhtPartitionsExchangeFuture f : exFutures) {
+ //Shouldn't contains staled futures.
+ assertTrue(f.topologyVersion().topologyVersion() >= nodeJoinTopVer);
+ }
+ }
+
+ /**
* Wait for rebalance, cache is destroyed and created again.
*
* @throws Exception If failed.