You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/03/17 17:54:03 UTC

[ignite] branch master updated: IGNITE-13374 Initial PME hangs because of multiple blinking nodes fixed. Fixes #8850

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ba2e717  IGNITE-13374 Initial PME hangs because of multiple blinking nodes fixed. Fixes #8850
ba2e717 is described below

commit ba2e717ba192f3512393c392d3470011ad6b8e66
Author: Alexander Lapin <la...@gmail.com>
AuthorDate: Wed Mar 17 20:53:15 2021 +0300

    IGNITE-13374 Initial PME hangs because of multiple blinking nodes fixed. Fixes #8850
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../preloader/GridDhtPartitionsExchangeFuture.java |  78 +++++++++-------
 .../ClientFastReplyCoordinatorFailureTest.java     | 100 +++++++++++++++++++++
 2 files changed, 146 insertions(+), 32 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b49dd73..032df1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -4551,6 +4551,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                                 }
 
                                 crd = node; // Do not allow to process FullMessage from old coordinator.
+
+                                processNonLocalCoordinatorChange(crd, node);
                             }
                             else {
                                 if (log.isInfoEnabled()) {
@@ -5229,38 +5231,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             }
                         }
                         else {
-                            if (crdChanged) {
-                                for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) {
-                                    if (crd0.equals(m.getKey())) {
-                                        if (log.isInfoEnabled()) {
-                                            log.info("Coordinator changed, process pending full message [" +
-                                                "ver=" + initialVersion() +
-                                                ", crd=" + node.id() +
-                                                ", pendingMsgNode=" + m.getKey() + ']');
-                                        }
-
-                                        processFullMessage(true, m.getKey(), m.getValue());
-
-                                        if (isDone())
-                                            return;
-                                    }
-                                }
-
-                                if (log.isInfoEnabled()) {
-                                    log.info("Coordinator changed, send partitions to new coordinator [" +
-                                        "ver=" + initialVersion() +
-                                        ", crd=" + node.id() +
-                                        ", newCrd=" + crd0.id() + ']');
-                                }
-
-                                final ClusterNode newCrd = crd0;
-
-                                cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
-                                    @Override public void run() {
-                                        sendPartitions(newCrd);
-                                    }
-                                });
-                            }
+                            if (crdChanged && processNonLocalCoordinatorChange(crd0, node))
+                                return;
                         }
                     }
                     catch (IgniteCheckedException e) {
@@ -5723,6 +5695,48 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Process coordinator change on non local node.
+     *
+     * @param crd New coordinator.
+     * @param node Left node.
+     * @return {@code true} if exchange {@code isDone}
+     */
+    private boolean processNonLocalCoordinatorChange(ClusterNode crd, ClusterNode node) {
+        for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) {
+            if (crd.equals(m.getKey())) {
+                if (log.isInfoEnabled()) {
+                    log.info("Coordinator changed, process pending full message [" +
+                        "ver=" + initialVersion() +
+                        ", crd=" + node.id() +
+                        ", pendingMsgNode=" + m.getKey() + ']');
+                }
+
+                processFullMessage(true, m.getKey(), m.getValue());
+
+                if (isDone())
+                    return true;
+            }
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info("Coordinator changed, send partitions to new coordinator [" +
+                "ver=" + initialVersion() +
+                ", crd=" + node.id() +
+                ", newCrd=" + crd.id() + ']');
+        }
+
+        final ClusterNode newCrd = crd;
+
+        cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
+            @Override public void run() {
+                sendPartitions(newCrd);
+            }
+        });
+
+        return false;
+    }
+
+    /**
      *
      */
     private static class FinishState {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java
index 6616b96..17b93fe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
@@ -26,10 +27,14 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.After;
@@ -51,6 +56,12 @@ public class ClientFastReplyCoordinatorFailureTest extends GridCommonAbstractTes
     /** Latch that will be triggered after blocking message from new server to old coordinator. */
     private final CountDownLatch newSrvSingleMesssageLatch = new CountDownLatch(1);
 
+    /** */
+    private static final CountDownLatch PART_SINGLE_REQ_MSG_LATCH = new CountDownLatch(1);
+
+    /** */
+    private boolean delayNodeFailedMsg;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -77,6 +88,13 @@ public class ClientFastReplyCoordinatorFailureTest extends GridCommonAbstractTes
                     return false;
                 }
             });
+
+            if (delayNodeFailedMsg) {
+                TcpDiscoverySpi spi = new TestDiscoverySpi();
+                spi.setIpFinder(IP_FINDER);
+
+                cfg.setDiscoverySpi(spi);
+            }
         }
         else if (getTestIgniteInstanceName(3).equals(igniteInstanceName)) {
             commSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@@ -94,6 +112,26 @@ public class ClientFastReplyCoordinatorFailureTest extends GridCommonAbstractTes
                 }
             });
         }
+        else if (delayNodeFailedMsg) {
+            commSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    if (msg instanceof GridDhtPartitionsSingleRequest && node.isClient()) {
+                        GridTestUtils.runAsync(() -> {
+                            try {
+                                Thread.sleep(1_000);
+                            }
+                            catch (InterruptedException ignore) {
+                                // No-op.
+                            }
+
+                            PART_SINGLE_REQ_MSG_LATCH.countDown();
+                        });
+                    }
+
+                    return false;
+                }
+            });
+        }
 
         cfg.setCommunicationSpi(commSpi);
 
@@ -135,4 +173,66 @@ public class ClientFastReplyCoordinatorFailureTest extends GridCommonAbstractTes
 
         startFut.get();
     }
+
+    /**
+     * Reproduces scenario of race between GridDhtPartitionsSingleRequest arriving at client and updating coordinator
+     * and onNodeLeft event for the same coordinator change event
+     * that should trigger resending SingleMessage from client to new coordinator node.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testClientRepeatedReply() throws Exception {
+        delayNodeFailedMsg = true;
+
+        startGrids(3);
+
+        awaitPartitionMapExchange();
+
+        // Server start will be blocked.
+        GridTestUtils.runAsync(() -> startGrid(3));
+
+        newSrvSingleMesssageLatch.await();
+
+        // Client join will be hanging on local join exchange.
+        IgniteInternalFuture<Ignite> startFut =
+            GridTestUtils.runAsync(() -> startClientGrid("client-1"));
+
+        clientSingleMesssageLatch.await();
+
+        stopGrid(0);
+
+        assertTrue(
+            GridTestUtils.waitForCondition(() -> {
+                try {
+                    startFut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    error("Failed when waiting for client start future to complete", e);
+                }
+
+                return true;
+            }, 10_000)
+        );
+    }
+
+    /**
+     * Used on client node, allows to postpone processing NODE_FAILED event
+     * and give GridDhtPartitionsSingleRequest to arrive earlier than discovery event is processed.
+     */
+    public static class TestDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc] */
+        @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+            if (msg instanceof TcpDiscoveryNodeLeftMessage || msg instanceof TcpDiscoveryNodeFailedMessage) {
+                try {
+                    PART_SINGLE_REQ_MSG_LATCH.await();
+                }
+                catch (InterruptedException e) {
+                    // no-op
+                }
+            }
+
+            super.startMessageProcess(msg);
+        }
+    }
 }