You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/30 06:09:09 UTC
[18/28] ignite git commit: IGNITE-9790 Fixed delayed full message
handling in case of changed coordinator. - Fixes #5190.
IGNITE-9790 Fixed delayed full message handling in case of changed coordinator. - Fixes #5190.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2906a165
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2906a165
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2906a165
Branch: refs/heads/ignite-627
Commit: 2906a16522279802a6002f9fba009978225afad3
Parents: 7000e6e
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Mon Oct 29 12:35:30 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Oct 29 13:01:09 2018 +0300
----------------------------------------------------------------------
.../GridDhtPartitionsExchangeFuture.java | 14 ++---
.../preloader/GridDhtPartitionsFullMessage.java | 16 ++++-
...rtitionsExchangeCoordinatorFailoverTest.java | 64 ++++++++++++++++++++
3 files changed, 85 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2906a165/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 23e47e5..0fc9c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -181,7 +181,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private List<ClusterNode> srvNodes;
/** */
- private ClusterNode crd;
+ private volatile ClusterNode crd;
/** ExchangeFuture id. */
private final GridDhtPartitionExchangeId exchId;
@@ -4550,12 +4550,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @return {@code True} If partition changes triggered by receiving Single/Full messages are not finished yet.
*/
public boolean partitionChangesInProgress() {
- boolean isCoordinator = crd.equals(cctx.localNode());
+ ClusterNode crd0 = crd;
- if (isCoordinator)
- return !partitionsSent;
- else
- return !partitionsReceived;
+ if (crd0 == null)
+ return false;
+
+ return crd0.equals(cctx.localNode()) ? !partitionsSent : !partitionsReceived;
}
/**
@@ -4589,7 +4589,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
});
}
else
- delayedLatestMsg.merge(fullMsg);
+ delayedLatestMsg.merge(fullMsg, cctx.discovery());
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2906a165/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index b84dc79..a63ab70 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -25,8 +25,10 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
@@ -803,7 +805,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
*
* @param other Other full message.
*/
- public void merge(GridDhtPartitionsFullMessage other) {
+ public void merge(GridDhtPartitionsFullMessage other, GridDiscoveryManager discovery) {
assert other.exchangeId() == null && exchangeId() == null :
"Both current and merge full message must have exchangeId == null"
+ other.exchangeId() + "," + exchangeId();
@@ -814,8 +816,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
GridDhtPartitionFullMap currMap = partitions().get(grpId);
- if (currMap == null || updMap.compareTo(currMap) >= 0)
+ if (currMap == null)
partitions().put(grpId, updMap);
+ else {
+ ClusterNode currentMapSentBy = discovery.node(currMap.nodeId());
+ ClusterNode newMapSentBy = discovery.node(updMap.nodeId());
+
+ if (newMapSentBy == null)
+ return;
+
+ if (currentMapSentBy == null || newMapSentBy.order() > currentMapSentBy.order() || updMap.compareTo(currMap) >= 0)
+ partitions().put(grpId, updMap);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2906a165/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
index a2adcf7..1c847f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
@@ -152,4 +152,68 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
cache.put(0, 0);
}
}
+
+ /**
+ * Test checks that delayed full messages are processed correctly in case of changed coordinator.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws Exception {
+ IgniteEx crd = startGrid("crd");
+
+ IgniteEx newCrd = startGrid(1);
+
+ IgniteEx problemNode = startGrid(2);
+
+ crd.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ blockSendingFullMessage(crd, problemNode);
+
+ IgniteInternalFuture joinNextNodeFut = GridTestUtils.runAsync(() -> startGrid(3));
+
+ joinNextNodeFut.get();
+
+ U.sleep(5000);
+
+ blockSendingFullMessage(newCrd, problemNode);
+
+ IgniteInternalFuture stopCoordinatorFut = GridTestUtils.runAsync(() -> stopGrid("crd"));
+
+ stopCoordinatorFut.get();
+
+ U.sleep(5000);
+
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(newCrd);
+
+ spi.stopBlock(true);
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * Blocks sending full message from coordinator to non-coordinator node.
+ * @param from Coordinator node.
+ * @param to Non-coordinator node.
+ */
+ private void blockSendingFullMessage(IgniteEx from, IgniteEx to) {
+ // Block FullMessage for newly joined nodes.
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(from);
+
+ // Delay sending full messages (without exchange id).
+ spi.blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionsFullMessage) {
+ GridDhtPartitionsFullMessage fullMsg = (GridDhtPartitionsFullMessage) msg;
+
+ if (fullMsg.exchangeId() != null && node.order() == to.localNode().order()) {
+ log.warning("Blocked sending " + msg + " to " + to.localNode());
+
+ return true;
+ }
+ }
+
+ return false;
+ });
+ }
}