You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/05/17 13:01:55 UTC

[ignite] branch master updated: IGNITE-11732 Fixed merged exchange future hang - Fixes #6441.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 07cba6a  IGNITE-11732 Fixed merged exchange future hang - Fixes #6441.
07cba6a is described below

commit 07cba6a557e6fbb7f3cb90cbd6ee6328b00befc7
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Fri May 17 15:57:16 2019 +0300

    IGNITE-11732 Fixed merged exchange future hang - Fixes #6441.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../preloader/GridDhtPartitionsExchangeFuture.java | 12 ++++-
 .../ExchangeMergeStaleServerNodesTest.java         | 59 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 2 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 78e46fe..b975295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2547,7 +2547,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * Merges this exchange with given one.
+     * Merges this exchange with given one. Invoked under synchronization on {@code mux} of the {@code fut}.
+     * All futures being merged are merged under a single synchronized section.
      *
      * @param fut Current exchange to merge with.
      * @return {@code True} if need wait for message from joined server node.
@@ -4487,6 +4488,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                                         wasMerged = true;
                                         rmvd = true;
+
+                                        awaitMergedMsgs--;
+
+                                        assert awaitMergedMsgs >= 0 : "exchFut=" + this + ", node=" + node;
                                     }
                                 }
                             }
@@ -4502,7 +4507,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                                     return;
 
                                 case CRD:
-                                    allReceived = rmvd && (remaining.isEmpty() && F.isEmpty(mergedJoinExchMsgs));
+                                    allReceived = rmvd && (remaining.isEmpty() && awaitMergedMsgs == 0);
 
                                     break;
 
@@ -4949,16 +4954,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @Override public String toString() {
         Set<UUID> remaining;
         Set<UUID> mergedJoinExch;
+        int awaitMergedMsgs;
 
         synchronized (mux) {
             remaining = new HashSet<>(this.remaining);
             mergedJoinExch = mergedJoinExchMsgs == null ? null : new HashSet<>(mergedJoinExchMsgs.keySet());
+            awaitMergedMsgs = this.awaitMergedMsgs;
         }
 
         return S.toString(GridDhtPartitionsExchangeFuture.class, this,
             "evtLatch", evtLatch == null ? "null" : evtLatch.getCount(),
             "remaining", remaining,
             "mergedJoinExchMsgs", mergedJoinExch,
+            "awaitMergedMsgs", awaitMergedMsgs,
             "super", super.toString());
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
index 0a59f2e..f6367b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java
@@ -48,6 +48,7 @@ import org.junit.Test;
 /**
  *
  */
+@SuppressWarnings("deprecation")
 public class ExchangeMergeStaleServerNodesTest extends GridCommonAbstractTest {
     /** */
     private Map<String, DelayableCommunicationSpi> commSpis;
@@ -131,6 +132,64 @@ public class ExchangeMergeStaleServerNodesTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testServersFailAfterDoubleMerge() throws Exception {
+        commSpis = F.asMap(
+            getTestIgniteInstanceName(0), new DelayableCommunicationSpi((msg) -> false),
+            getTestIgniteInstanceName(1), new DelayableCommunicationSpi((msg) -> false),
+            getTestIgniteInstanceName(2), new DelayableCommunicationSpi((msg) -> false),
+            getTestIgniteInstanceName(3), new DelayableCommunicationSpi((msg) -> msg instanceof GridDhtPartitionsSingleMessage),
+            getTestIgniteInstanceName(4), new DelayableCommunicationSpi((msg) -> false)
+        );
+
+        try {
+            IgniteEx crd = startGrid(0);
+
+            GridCachePartitionExchangeManager<Object, Object> exchMgr = crd.context().cache().context().exchange();
+
+            exchMgr.mergeExchangesTestWaitVersion(new AffinityTopologyVersion(4, 0), null);
+
+            // This start will trigger an exchange.
+            IgniteInternalFuture<IgniteEx> fut1 = GridTestUtils.runAsync(() -> startGrid(1), "starter1");
+            // This exchange will be merged.
+            IgniteInternalFuture<IgniteEx> fut2 = GridTestUtils.runAsync(() -> startGrid(2), "starter2");
+
+            GridTestUtils.waitForCondition(() -> exchMgr.lastTopologyFuture().exchangeId().topologyVersion()
+                .equals(new AffinityTopologyVersion(2, 0)), getTestTimeout());
+
+            // This exchange will be merged as well, but the node will be failed.
+            IgniteInternalFuture<IgniteEx> futFail = GridTestUtils.runAsync(() -> startGrid(3), "starter3");
+
+            GridTestUtils.waitForCondition(exchMgr::hasPendingExchange, getTestTimeout());
+
+            // Wait for merged exchange.
+            GridTestUtils.waitForCondition(
+                () -> exchMgr.mergeExchangesTestWaitVersion() == null, getTestTimeout());
+
+            futFail.cancel();
+            stopGrid(getTestIgniteInstanceName(2), true);
+
+            fut1.get();
+            fut2.get();
+
+            try {
+                futFail.get();
+            }
+            catch (IgniteCheckedException ignore) {
+                // No-op.
+            }
+
+            // Check that next nodes can successfully join topology.
+            startGrid(4);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      *
      */
     private static class DelayableCommunicationSpi extends TcpCommunicationSpi {