You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/01/23 15:30:35 UTC

[ignite] branch master updated: IGNITE-10933 Fixed node join hang when joining node receives client reconnect message - Fixes #5852.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 20cf75d  IGNITE-10933 Fixed node join hang when joining node receives client reconnect message - Fixes #5852.
20cf75d is described below

commit 20cf75db8d674b0fcc82407e6264a924e4250444
Author: Aleksei Scherbakov <al...@gmail.com>
AuthorDate: Wed Jan 23 18:29:29 2019 +0300

    IGNITE-10933 Fixed node join hang when joining node receives client reconnect message - Fixes #5852.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../IgniteDiscoverySpiInternalListener.java        |   8 +
 .../ignite/spi/discovery/tcp/ServerImpl.java       |  17 +-
 .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java  |  10 +-
 .../ignite/internal/DiscoverySpiTestListener.java  |  33 +++
 .../TcpDiscoveryReconnectUnstableTopologyTest.java | 224 +++++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java           |   5 +-
 6 files changed, 292 insertions(+), 5 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
index 24405f8..8016423 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
@@ -33,6 +33,14 @@ public interface IgniteDiscoverySpiInternalListener {
     public void beforeJoin(ClusterNode locNode, IgniteLogger log);
 
     /**
+     * @param locNode Local node.
+     * @param log Logger.
+     */
+    default public void beforeReconnect(ClusterNode locNode, IgniteLogger log) {
+        // No-op.
+    }
+
+    /**
      * @param spi SPI instance.
      * @param log Logger.
      * @param msg Custom message.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 879c8b8..49a4f55 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3245,12 +3245,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (log.isDebugEnabled())
                                     log.debug("Pending messages will be sent [failure=" + failure +
                                         ", newNextNode=" + newNextNode +
-                                        ", forceSndPending=" + forceSndPending + ']');
+                                        ", forceSndPending=" + forceSndPending +
+                                        ", failedNodes=" + failedNodes + ']');
 
                                 if (debugMode)
                                     debugLog(msg, "Pending messages will be sent [failure=" + failure +
                                         ", newNextNode=" + newNextNode +
-                                        ", forceSndPending=" + forceSndPending + ']');
+                                        ", forceSndPending=" + forceSndPending +
+                                        ", failedNodes=" + failedNodes + ']');
 
                                 for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
                                     long tstamp = U.currentTimeMillis();
@@ -6358,6 +6360,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 continue;
                             }
                             else {
+                                TcpDiscoveryClientReconnectMessage msg0 = (TcpDiscoveryClientReconnectMessage)msg;
+
+                                // If message is received from previous node and node is connecting forward to next node.
+                                if (!getLocalNodeId().equals(msg0.routerNodeId()) && state == CONNECTING) {
+                                    spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+
+                                    msgWorker.addMessage(msg);
+
+                                    continue;
+                                }
+
                                 spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
 
                                 break;
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index bee7a84..c87e00d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -104,6 +104,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
@@ -1648,8 +1649,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
         OutputStream out,
         TcpDiscoveryAbstractMessage msg,
         long timeout) throws IOException, IgniteCheckedException {
-        if (internalLsnr != null && msg instanceof TcpDiscoveryJoinRequestMessage)
-            internalLsnr.beforeJoin(locNode, log);
+        if (internalLsnr != null) {
+            if (msg instanceof TcpDiscoveryJoinRequestMessage)
+                internalLsnr.beforeJoin(locNode, log);
+
+            if (msg instanceof TcpDiscoveryClientReconnectMessage)
+                internalLsnr.beforeReconnect(locNode, log);
+        }
 
         assert sock != null;
         assert msg != null;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
index 46d9edc..051327a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
@@ -43,6 +43,9 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe
     private volatile CountDownLatch joinLatch;
 
     /** */
+    private volatile CountDownLatch reconLatch;
+
+    /** */
     private Set<Class<?>> blockCustomEvtCls;
 
     /** */
@@ -67,10 +70,24 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe
     /**
      *
      */
+    public void startBlockReconnect() {
+        reconLatch = new CountDownLatch(1);
+    }
+
+    /**
+     *
+     */
     public void stopBlockJoin() {
         joinLatch.countDown();
     }
 
+    /**
+     *
+     */
+    public void stopBlockRestart() {
+        reconLatch.countDown();
+    }
+
     /** {@inheritDoc} */
     @Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) {
         try {
@@ -88,6 +105,22 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe
     }
 
     /** {@inheritDoc} */
+    @Override public void beforeReconnect(ClusterNode locNode, IgniteLogger log) {
+        try {
+            CountDownLatch writeLatch0 = reconLatch;
+
+            if (writeLatch0 != null) {
+                log.info("Block reconnect");
+
+                U.await(writeLatch0);
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) {
         this.spi = spi;
         this.log = log;
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryReconnectUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryReconnectUnstableTopologyTest.java
new file mode 100644
index 0000000..55f745a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryReconnectUnstableTopologyTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.DiscoverySpiTestListener;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test scenario:
+ *
+ * 1. Create topology in specific order: srv1 srv2 client srv3 srv4
+ * 2. Delay client reconnect.
+ * 3. Trigger topology change by restarting srv2 (will trigger reconnect to next node), srv3, srv4
+ * 4. Resume reconnect to node with empty EnsuredMessageHistory and wait for completion.
+ * 5. Add new node to topology.
+ *
+ * Pass condition: new node successfully joins topology.
+ */
+@RunWith(JUnit4.class)
+public class TcpDiscoveryReconnectUnstableTopologyTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+        cfg.setAutoActivationEnabled(false);
+
+        BlockTcpDiscoverySpi spi = new BlockTcpDiscoverySpi();
+
+        // Guarantees client join to srv2.
+        Field rndAddrsField = U.findField(BlockTcpDiscoverySpi.class, "skipAddrsRandomization");
+
+        assertNotNull(rndAddrsField);
+
+        rndAddrsField.set(spi, true);
+
+        cfg.setDiscoverySpi(spi.setIpFinder(ipFinder));
+
+        cfg.setClientMode(igniteInstanceName.startsWith("client"));
+
+        cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME));
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReconnectUnstableTopology() throws Exception {
+        try {
+            List<IgniteEx> nodes = new ArrayList<>();
+
+            nodes.add(startGrid(0));
+
+            nodes.add(startGrid(1));
+
+            nodes.add(startGrid("client"));
+
+            nodes.add(startGrid(2));
+
+            nodes.add(startGrid(3));
+
+            for (int i = 0; i < nodes.size(); i++) {
+                IgniteEx ex = nodes.get(i);
+
+                assertEquals(i + 1, ex.localNode().order());
+            }
+
+            DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+            spi(grid("client")).setInternalListener(lsnr);
+
+            lsnr.startBlockReconnect();
+
+            CountDownLatch restartLatch = new CountDownLatch(1);
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
+                stopGrid(1);
+                stopGrid(2);
+                stopGrid(3);
+                try {
+                    startGrid(1);
+                    startGrid(2);
+                    startGrid(3);
+                }
+                catch (Exception e) {
+                    fail();
+                }
+
+                restartLatch.countDown();
+            }, 1, "restarter");
+
+            U.awaitQuiet(restartLatch);
+
+            lsnr.stopBlockRestart();
+
+            fut.get();
+
+            doSleep(1500); // Wait for reconnect.
+
+            startGrid(4);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param ig Ignite.
+     */
+    private TcpDiscoverySpi spi(Ignite ig) {
+        return (TcpDiscoverySpi)ig.configuration().getDiscoverySpi();
+    }
+
+    /**
+     * Discovery SPI with blocking support.
+     */
+    protected class BlockTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** Closure. */
+        private volatile IgniteBiClosure<ClusterNode, DiscoveryCustomMessage, Void> clo;
+
+        /**
+         * @param clo Closure.
+         */
+        public void setClosure(IgniteBiClosure<ClusterNode, DiscoveryCustomMessage, Void> clo) {
+            this.clo = clo;
+        }
+
+        /**
+         * @param addr Address.
+         * @param msg Message.
+         */
+        private synchronized void apply(ClusterNode addr, TcpDiscoveryAbstractMessage msg) {
+            if (!(msg instanceof TcpDiscoveryCustomEventMessage))
+                return;
+
+            TcpDiscoveryCustomEventMessage cm = (TcpDiscoveryCustomEventMessage)msg;
+
+            DiscoveryCustomMessage delegate;
+
+            try {
+                DiscoverySpiCustomMessage custMsg = cm.message(marshaller(), U.resolveClassLoader(ignite().configuration()));
+
+                assertNotNull(custMsg);
+
+                delegate = ((CustomMessageWrapper)custMsg).delegate();
+
+            }
+            catch (Throwable throwable) {
+                throw new RuntimeException(throwable);
+            }
+
+            if (clo != null)
+                clo.apply(addr, delegate);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(
+            Socket sock,
+            TcpDiscoveryAbstractMessage msg,
+            byte[] data,
+            long timeout
+        ) throws IOException {
+            if (spiCtx != null)
+                apply(spiCtx.localNode(), msg);
+
+            super.writeToSocket(sock, msg, data, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock,
+            OutputStream out,
+            TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (spiCtx != null)
+                apply(spiCtx.localNode(), msg);
+
+            super.writeToSocket(sock, out, msg, timeout);
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 7da76e7..18776b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -40,6 +40,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeAttributesUpdateOnRec
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConfigConsistentIdSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConsistentIdSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryPendingMessageDeliveryTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryReconnectUnstableTopologyTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryRestartTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySegmentationPolicyTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySelfTest;
@@ -137,7 +138,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
 
     FilterDataForClientNodeDiscoveryTest.class,
 
-    TcpDiscoveryPendingMessageDeliveryTest.class
+    TcpDiscoveryPendingMessageDeliveryTest.class,
+
+    TcpDiscoveryReconnectUnstableTopologyTest.class
 })
 public class IgniteSpiDiscoverySelfTestSuite {
     /** */