You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/29 10:01:50 UTC

ignite git commit: IGNITE-9790 Fixed delayed full message handling in case of changed coordinator. - Fixes #5190.

Repository: ignite
Updated Branches:
  refs/heads/master 7000e6e35 -> 2906a1652


IGNITE-9790 Fixed delayed full message handling in case of changed coordinator. - Fixes #5190.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2906a165
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2906a165
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2906a165

Branch: refs/heads/master
Commit: 2906a16522279802a6002f9fba009978225afad3
Parents: 7000e6e
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Mon Oct 29 12:35:30 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Oct 29 13:01:09 2018 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        | 14 ++---
 .../preloader/GridDhtPartitionsFullMessage.java | 16 ++++-
 ...rtitionsExchangeCoordinatorFailoverTest.java | 64 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2906a165/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 23e47e5..0fc9c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -181,7 +181,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private List<ClusterNode> srvNodes;
 
     /** */
-    private ClusterNode crd;
+    private volatile ClusterNode crd;
 
     /** ExchangeFuture id. */
     private final GridDhtPartitionExchangeId exchId;
@@ -4550,12 +4550,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return {@code True} If partition changes triggered by receiving Single/Full messages are not finished yet.
      */
     public boolean partitionChangesInProgress() {
-        boolean isCoordinator = crd.equals(cctx.localNode());
+        ClusterNode crd0 = crd;
 
-        if (isCoordinator)
-            return !partitionsSent;
-        else
-            return !partitionsReceived;
+        if (crd0 == null)
+            return false;
+
+        return crd0.equals(cctx.localNode()) ? !partitionsSent : !partitionsReceived;
     }
 
     /**
@@ -4589,7 +4589,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             });
         }
         else
-            delayedLatestMsg.merge(fullMsg);
+            delayedLatestMsg.merge(fullMsg, cctx.discovery());
 
         return true;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2906a165/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index b84dc79..a63ab70 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -25,8 +25,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
@@ -803,7 +805,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      *
      * @param other Other full message.
      */
-    public void merge(GridDhtPartitionsFullMessage other) {
+    public void merge(GridDhtPartitionsFullMessage other, GridDiscoveryManager discovery) {
         assert other.exchangeId() == null && exchangeId() == null :
             "Both current and merge full message must have exchangeId == null"
              + other.exchangeId() + "," + exchangeId();
@@ -814,8 +816,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
             GridDhtPartitionFullMap currMap = partitions().get(grpId);
 
-            if (currMap == null || updMap.compareTo(currMap) >= 0)
+            if (currMap == null)
                 partitions().put(grpId, updMap);
+            else {
+                ClusterNode currentMapSentBy = discovery.node(currMap.nodeId());
+                ClusterNode newMapSentBy = discovery.node(updMap.nodeId());
+
+                if (newMapSentBy == null)
+                    return;
+
+                if (currentMapSentBy == null || newMapSentBy.order() > currentMapSentBy.order() || updMap.compareTo(currMap) >= 0)
+                    partitions().put(grpId, updMap);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2906a165/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
index a2adcf7..1c847f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
@@ -152,4 +152,68 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
             cache.put(0, 0);
         }
     }
+
+    /**
+     * Test checks that delayed full messages are processed correctly in case of changed coordinator.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws Exception {
+        IgniteEx crd = startGrid("crd");
+
+        IgniteEx newCrd = startGrid(1);
+
+        IgniteEx problemNode = startGrid(2);
+
+        crd.cluster().active(true);
+
+        awaitPartitionMapExchange();
+
+        blockSendingFullMessage(crd, problemNode);
+
+        IgniteInternalFuture joinNextNodeFut = GridTestUtils.runAsync(() -> startGrid(3));
+
+        joinNextNodeFut.get();
+
+        U.sleep(5000);
+
+        blockSendingFullMessage(newCrd, problemNode);
+
+        IgniteInternalFuture stopCoordinatorFut = GridTestUtils.runAsync(() -> stopGrid("crd"));
+
+        stopCoordinatorFut.get();
+
+        U.sleep(5000);
+
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(newCrd);
+
+        spi.stopBlock(true);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * Blocks sending full message from coordinator to non-coordinator node.
+     * @param from Coordinator node.
+     * @param to Non-coordinator node.
+     */
+    private void blockSendingFullMessage(IgniteEx from, IgniteEx to) {
+        // Block FullMessage for newly joined nodes.
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(from);
+
+        // Delay sending full messages (without exchange id).
+        spi.blockMessages((node, msg) -> {
+            if (msg instanceof GridDhtPartitionsFullMessage) {
+                GridDhtPartitionsFullMessage fullMsg = (GridDhtPartitionsFullMessage) msg;
+
+                if (fullMsg.exchangeId() != null && node.order() == to.localNode().order()) {
+                    log.warning("Blocked sending " + msg + " to " + to.localNode());
+
+                    return true;
+                }
+            }
+
+            return false;
+        });
+    }
 }