You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/05/07 15:21:44 UTC

[ignite] 36/41: GG-17478 Fixed wrong assert on affinity initialization on node join

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch gg-18540
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 8baee8b280626e04157caa937e81a8b8b2ebbd8e
Author: Pavel Kovalenko <jo...@gmail.com>
AuthorDate: Mon May 6 12:21:52 2019 +0300

    GG-17478 Fixed wrong assert on affinity initialization on node join
    
    Signed-off-by: Pavel Kovalenko <jo...@gmail.com>
---
 .../cache/CacheAffinitySharedManager.java          |   2 +-
 .../PartitionsExchangeCoordinatorFailoverTest.java | 147 +++++++++++++++++----
 2 files changed, 120 insertions(+), 29 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 45f55da..29b4d81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1543,7 +1543,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 CacheGroupContext grp = cctx.cache().cacheGroup(holder.groupId());
 
                 if (affReq != null && affReq.contains(aff.groupId())) {
-                    assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()) : aff.lastVersion();
+                    assert resTopVer.compareTo(aff.lastVersion()) >= 0 : aff.lastVersion();
 
                     CacheGroupAffinityMessage affMsg = receivedAff.get(aff.groupId());
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
index fdef983..5cff138 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
@@ -16,12 +16,15 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -41,9 +44,9 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 import org.junit.Test;
@@ -55,8 +58,14 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
     /** */
     private static final String CACHE_NAME = "cache";
 
+    /** Coordinator node name. */
+    private static final String CRD_NONE = "crd";
+
+    /** */
+    private volatile Supplier<TcpCommunicationSpi> spiFactory = TcpCommunicationSpi::new;
+
     /** */
-    private Supplier<CommunicationSpi> spiFactory = TcpCommunicationSpi::new;
+    private boolean newCaches = true;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -64,7 +73,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
 
         cfg.setConsistentId(igniteInstanceName);
 
-        cfg.setCommunicationSpi(spiFactory.get());
+        cfg.setCommunicationSpi(spiFactory.get().setName("tcp"));
 
         cfg.setCacheConfiguration(
                 new CacheConfiguration(CACHE_NAME)
@@ -73,7 +82,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
         );
 
         // Add cache that exists only on coordinator node.
-        if (igniteInstanceName.equals("crd")) {
+        if (newCaches && igniteInstanceName.equals(CRD_NONE)) {
             IgnitePredicate<ClusterNode> nodeFilter = node -> node.consistentId().equals(igniteInstanceName);
 
             cfg.setCacheConfiguration(
@@ -106,7 +115,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
     public void testNewCoordinatorCompletedExchange() throws Exception {
         spiFactory = TestRecordingCommunicationSpi::new;
 
-        IgniteEx crd = (IgniteEx) startGrid("crd");
+        IgniteEx crd = startGrid(CRD_NONE);
 
         IgniteEx newCrd = startGrid(1);
 
@@ -121,13 +130,13 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
         // Block FullMessage for newly joined nodes.
         TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(crd);
 
-        final CountDownLatch sendFullMsgLatch = new CountDownLatch(1);
+        final CountDownLatch sndFullMsgLatch = new CountDownLatch(1);
 
         // Delay sending full message to newly joined nodes.
         spi.blockMessages((node, msg) -> {
             if (msg instanceof GridDhtPartitionsFullMessage && node.order() > 2) {
                 try {
-                    sendFullMsgLatch.await();
+                    sndFullMsgLatch.await();
                 }
                 catch (Throwable ignored) { }
 
@@ -155,13 +164,13 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
             getTestTimeout()
         );
 
-        IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid("crd", true, false));
+        IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid(CRD_NONE, true, false));
 
         // Magic sleep to make sure that coordinator stop process has started.
         U.sleep(1000);
 
         // Resume full messages sending to unblock coordinator stopping process.
-        sendFullMsgLatch.countDown();
+        sndFullMsgLatch.countDown();
 
         // Coordinator stop should succeed.
         stopCrdFut.get();
@@ -192,7 +201,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
     public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws Exception {
         spiFactory = TestRecordingCommunicationSpi::new;
 
-        IgniteEx crd = startGrid("crd");
+        IgniteEx crd = startGrid(CRD_NONE);
 
         IgniteEx newCrd = startGrid(1);
 
@@ -202,7 +211,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
 
         awaitPartitionMapExchange();
 
-        blockSendingFullMessage(crd, problemNode);
+        blockSendingFullMessage(crd, node -> node.equals(problemNode.localNode()));
 
         IgniteInternalFuture joinNextNodeFut = GridTestUtils.runAsync(() -> startGrid(3));
 
@@ -210,11 +219,11 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
 
         U.sleep(5000);
 
-        blockSendingFullMessage(newCrd, problemNode);
+        blockSendingFullMessage(newCrd, node -> node.equals(problemNode.localNode()));
 
-        IgniteInternalFuture stopCoordinatorFut = GridTestUtils.runAsync(() -> stopGrid("crd"));
+        IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid(CRD_NONE));
 
-        stopCoordinatorFut.get();
+        stopCrdFut.get();
 
         U.sleep(5000);
 
@@ -237,9 +246,9 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
             final int delay = 5_000;
 
             if (msg instanceof GridDhtPartitionDemandMessage) {
-                GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage) msg;
+                GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage) msg;
 
-                if (demandMessage.groupId() == GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
+                if (demandMsg.groupId() == GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
                     return 0;
 
                 return delay;
@@ -248,7 +257,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
             return 0;
         });
 
-        final IgniteEx crd = startGrid("crd");
+        final IgniteEx crd = startGrid(CRD_NONE);
 
         startGrid(1);
 
@@ -293,7 +302,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
         U.sleep(2_500);
 
         // And then stop coordinator node.
-        stopGrid("crd", true);
+        stopGrid(CRD_NONE, true);
 
         startNodeFut.get();
 
@@ -314,11 +323,93 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
     }
 
     /**
+     * Test checks that changing coordinator to a node that joining to cluster at the moment works correctly
+     * in case of exchanges merge and completed exchange on other joining nodes.
+     */
+    @Test
+    @WithSystemProperty(key = IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value = "true")
+    public void testChangeCoordinatorToLocallyJoiningNode() throws Exception {
+        newCaches = false;
+
+        spiFactory = TestRecordingCommunicationSpi::new;
+
+        IgniteEx crd = startGrid(CRD_NONE);
+
+        final int newCrdNodeIdx = 1;
+
+        // A full message shouldn't be send to new coordinator.
+        blockSendingFullMessage(crd, node -> node.consistentId().equals(getTestIgniteInstanceName(newCrdNodeIdx)));
+
+        CountDownLatch joiningNodeSentSingleMsg = new CountDownLatch(1);
+
+        // For next joining node delay sending single message to emulate exchanges merge.
+        spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
+            final int delay = 5_000;
+
+            if (msg instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg;
+
+                if (singleMsg.exchangeId() != null) {
+                    joiningNodeSentSingleMsg.countDown();
+
+                    return delay;
+                }
+            }
+
+            return 0;
+        });
+
+        IgniteInternalFuture<?> newCrdJoinFut = GridTestUtils.runAsync(() -> startGrid(newCrdNodeIdx));
+
+        // Wait till new coordinator node sent single message.
+        joiningNodeSentSingleMsg.await();
+
+        spiFactory = TcpCommunicationSpi::new;
+
+        // Additionally start 2 new nodes. Their exchange should be merged with exchange on join new coordinator node.
+        startGridsMultiThreaded(2, 2);
+
+        Assert.assertFalse("New coordinator join shouldn't be happened before stopping old coordinator.",
+            newCrdJoinFut.isDone());
+
+        // Stop coordinator.
+        stopGrid(CRD_NONE);
+
+        // New coordinator join process should succeed after that.
+        newCrdJoinFut.get();
+
+        awaitPartitionMapExchange();
+
+        // Check that affinity are equal on all nodes.
+        AffinityTopologyVersion affVer = ((IgniteEx) ignite(1)).cachex(CACHE_NAME)
+            .context().shared().exchange().readyAffinityVersion();
+
+        List<List<ClusterNode>> expAssignment = null;
+        IgniteEx expAssignmentNode = null;
+
+        for (Ignite node : G.allGrids()) {
+            IgniteEx nodeEx = (IgniteEx) node;
+
+            List<List<ClusterNode>> assignment = nodeEx.cachex(CACHE_NAME).context().affinity().assignments(affVer);
+
+            if (expAssignment == null) {
+                expAssignment = assignment;
+                expAssignmentNode = nodeEx;
+            }
+            else
+                Assert.assertEquals("Affinity assignments are different " +
+                    "[expectedNode=" + expAssignmentNode + ", actualNode=" + nodeEx + "]", expAssignment, assignment);
+        }
+    }
+
+    /**
      * Blocks sending full message from coordinator to non-coordinator node.
+     *
      * @param from Coordinator node.
-     * @param to Non-coordinator node.
+     * @param pred Non-coordinator node predicate.
+     *                  If predicate returns {@code true} a full message will not be send to that node.
      */
-    private void blockSendingFullMessage(IgniteEx from, IgniteEx to) {
+    private void blockSendingFullMessage(IgniteEx from, Predicate<ClusterNode> pred) {
         // Block FullMessage for newly joined nodes.
         TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(from);
 
@@ -327,8 +418,8 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
             if (msg instanceof GridDhtPartitionsFullMessage) {
                 GridDhtPartitionsFullMessage fullMsg = (GridDhtPartitionsFullMessage) msg;
 
-                if (fullMsg.exchangeId() != null && node.order() == to.localNode().order()) {
-                    log.warning("Blocked sending " + msg + " to " + to.localNode());
+                if (fullMsg.exchangeId() != null && pred.test(node)) {
+                    log.warning("Blocked sending " + msg + " to " + node);
 
                     return true;
                 }
@@ -341,9 +432,9 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
     /**
      * Communication SPI that allows to delay sending message by predicate.
      */
-    class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
+    static class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
         /** Function that returns delay in milliseconds for given message. */
-        private final Function<Message, Integer> delayMessageFunc;
+        private final Function<Message, Integer> delayMsgFunc;
 
         /** */
         DynamicDelayingCommunicationSpi() {
@@ -351,10 +442,10 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
         }
 
         /**
-         * @param delayMessageFunc Function to calculate delay for message.
+         * @param delayMsgFunc Function to calculate delay for message.
          */
-        DynamicDelayingCommunicationSpi(final Function<Message, Integer> delayMessageFunc) {
-            this.delayMessageFunc = delayMessageFunc;
+        DynamicDelayingCommunicationSpi(final Function<Message, Integer> delayMsgFunc) {
+            this.delayMsgFunc = delayMsgFunc;
         }
 
         /** {@inheritDoc} */
@@ -363,7 +454,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
             try {
                 GridIoMessage ioMsg = (GridIoMessage)msg;
 
-                int delay = delayMessageFunc.apply(ioMsg.message());
+                int delay = delayMsgFunc.apply(ioMsg.message());
 
                 if (delay > 0) {
                     log.warning(String.format("Delay sending %s to %s", msg, node));