You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/05/17 13:01:55 UTC
[ignite] branch master updated: IGNITE-11732 Fixed merged exchange
future hang - Fixes #6441.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 07cba6a IGNITE-11732 Fixed merged exchange future hang - Fixes #6441.
07cba6a is described below
commit 07cba6a557e6fbb7f3cb90cbd6ee6328b00befc7
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Fri May 17 15:57:16 2019 +0300
IGNITE-11732 Fixed merged exchange future hang - Fixes #6441.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../preloader/GridDhtPartitionsExchangeFuture.java | 12 ++++-
.../ExchangeMergeStaleServerNodesTest.java | 59 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 2 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 78e46fe..b975295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2547,7 +2547,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * Merges this exchange with given one.
+ * Merges this exchange with given one. Invoked under synchronization on {@code mux} of the {@code fut}.
+ * All futures being merged are merged under a single synchronized section.
*
* @param fut Current exchange to merge with.
* @return {@code True} if need wait for message from joined server node.
@@ -4487,6 +4488,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
wasMerged = true;
rmvd = true;
+
+ awaitMergedMsgs--;
+
+ assert awaitMergedMsgs >= 0 : "exchFut=" + this + ", node=" + node;
}
}
}
@@ -4502,7 +4507,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return;
case CRD:
- allReceived = rmvd && (remaining.isEmpty() && F.isEmpty(mergedJoinExchMsgs));
+ allReceived = rmvd && (remaining.isEmpty() && awaitMergedMsgs == 0);
break;
@@ -4949,16 +4954,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
@Override public String toString() {
Set<UUID> remaining;
Set<UUID> mergedJoinExch;
+ int awaitMergedMsgs;
synchronized (mux) {
remaining = new HashSet<>(this.remaining);
mergedJoinExch = mergedJoinExchMsgs == null ? null : new HashSet<>(mergedJoinExchMsgs.keySet());
+ awaitMergedMsgs = this.awaitMergedMsgs;
}
return S.toString(GridDhtPartitionsExchangeFuture.class, this,
"evtLatch", evtLatch == null ? "null" : evtLatch.getCount(),
"remaining", remaining,
"mergedJoinExchMsgs", mergedJoinExch,
+ "awaitMergedMsgs", awaitMergedMsgs,
"super", super.toString());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
index 0a59f2e..f6367b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
@@ -48,6 +48,7 @@ import org.junit.Test;
/**
*
*/
+@SuppressWarnings("deprecation")
public class ExchangeMergeStaleServerNodesTest extends GridCommonAbstractTest {
/** */
private Map<String, DelayableCommunicationSpi> commSpis;
@@ -131,6 +132,64 @@ public class ExchangeMergeStaleServerNodesTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testServersFailAfterDoubleMerge() throws Exception {
+ commSpis = F.asMap(
+ getTestIgniteInstanceName(0), new DelayableCommunicationSpi((msg) -> false),
+ getTestIgniteInstanceName(1), new DelayableCommunicationSpi((msg) -> false),
+ getTestIgniteInstanceName(2), new DelayableCommunicationSpi((msg) -> false),
+ getTestIgniteInstanceName(3), new DelayableCommunicationSpi((msg) -> msg instanceof GridDhtPartitionsSingleMessage),
+ getTestIgniteInstanceName(4), new DelayableCommunicationSpi((msg) -> false)
+ );
+
+ try {
+ IgniteEx crd = startGrid(0);
+
+ GridCachePartitionExchangeManager<Object, Object> exchMgr = crd.context().cache().context().exchange();
+
+ exchMgr.mergeExchangesTestWaitVersion(new AffinityTopologyVersion(4, 0), null);
+
+ // This start will trigger an exchange.
+ IgniteInternalFuture<IgniteEx> fut1 = GridTestUtils.runAsync(() -> startGrid(1), "starter1");
+ // This exchange will be merged.
+ IgniteInternalFuture<IgniteEx> fut2 = GridTestUtils.runAsync(() -> startGrid(2), "starter2");
+
+ GridTestUtils.waitForCondition(() -> exchMgr.lastTopologyFuture().exchangeId().topologyVersion()
+ .equals(new AffinityTopologyVersion(2, 0)), getTestTimeout());
+
+ // This exchange will be merged as well, but the node will be failed.
+ IgniteInternalFuture<IgniteEx> futFail = GridTestUtils.runAsync(() -> startGrid(3), "starter3");
+
+ GridTestUtils.waitForCondition(exchMgr::hasPendingExchange, getTestTimeout());
+
+ // Wait for merged exchange.
+ GridTestUtils.waitForCondition(
+ () -> exchMgr.mergeExchangesTestWaitVersion() == null, getTestTimeout());
+
+ futFail.cancel();
+ stopGrid(getTestIgniteInstanceName(2), true);
+
+ fut1.get();
+ fut2.get();
+
+ try {
+ futFail.get();
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
+
+ // Check that next nodes can successfully join topology.
+ startGrid(4);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
*
*/
private static class DelayableCommunicationSpi extends TcpCommunicationSpi {