You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/02/05 07:04:32 UTC

[ignite] branch master updated: IGNITE-12982 Initialization of TcpCommunicationSpi fix: no messages are sent until SPI is fully initialized. - Fixes #8717.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 52a1749  IGNITE-12982 Initialization of TcpCommunicationSpi fix: no messages are sent until SPI is fully initialized. - Fixes #8717.
52a1749 is described below

commit 52a1749e936efa612a2d0740de5b7f36ffb8a13c
Author: Sergey Chugunov <se...@gmail.com>
AuthorDate: Fri Feb 5 10:00:04 2021 +0300

    IGNITE-12982 Initialization of TcpCommunicationSpi fix: no messages are sent until SPI is fully initialized. - Fixes #8717.
    
    Signed-off-by: Ivan Bessonov <be...@gmail.com>
---
 .../spi/communication/tcp/TcpCommunicationSpi.java |   4 +-
 .../tcp/GridTcpCommunicationSpiConfigSelfTest.java | 158 +++++++++++++++++++++
 .../ignite/testframework/GridSpiTestContext.java   |  14 ++
 .../testframework/junits/IgniteTestResources.java  |   6 +-
 4 files changed, 179 insertions(+), 3 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1012612..1858abc 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -940,8 +940,6 @@ public class TcpCommunicationSpi extends TcpCommunicationConfigInitializer {
 
         spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-        ctxInitLatch.countDown();
-
         metricsLsnr = new TcpCommunicationMetricsListener(ignite, spiCtx);
 
         registerMBean(igniteInstanceName, new TcpCommunicationSpiMBeanImpl(this, metricsLsnr, cfg, stateProvider), TcpCommunicationSpiMBean.class);
@@ -952,6 +950,8 @@ public class TcpCommunicationSpi extends TcpCommunicationConfigInitializer {
 
         if (shmemAcceptWorker != null)
             shmemAcceptWorker.metricsListener(metricsLsnr);
+
+        ctxInitLatch.countDown();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index ffc0ddc..29e0e4d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -19,14 +19,34 @@ package org.apache.ignite.spi.communication.tcp;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
+
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.GridTestMessage;
+import org.apache.ignite.testframework.GridSpiTestContext;
+import org.apache.ignite.testframework.GridTestNode;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractConfigTest;
 import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -36,6 +56,7 @@ import static java.util.Objects.isNull;
 import static java.util.Objects.requireNonNull;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
 import static org.apache.ignite.internal.util.IgniteUtils.spiAttribute;
 import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.ATTR_HOST_NAMES;
 import static org.apache.ignite.testframework.GridTestUtils.getFreeCommPort;
@@ -51,10 +72,31 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig
      */
     private String locHost = "0.0.0.0";
 
+    /** */
+    private final Collection<IgniteTestResources> resourcesToClean = new ArrayList<>();
+
+    /** */
+    private final Collection<CommunicationSpi> spisToStop = new ArrayList<>();
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
 
+        for (IgniteTestResources itr : resourcesToClean) {
+            itr.stopThreads();
+        }
+
+        for (CommunicationSpi commSpi : spisToStop) {
+            commSpi.onContextDestroyed();
+
+            commSpi.setListener(null);
+
+            commSpi.spiStop();
+        }
+
+        resourcesToClean.clear();
+        spisToStop.clear();
+
         super.afterTest();
     }
 
@@ -106,6 +148,122 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig
     }
 
     /**
+     * Verifies that TcpCommunicationSpi starts messaging protocol only when fully initialized.
+     *
+     * @see <a href="https://issues.apache.org/jira/browse/IGNITE-12982">IGNITE-12982</a>
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = "IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", value = "true")
+    public void testSendToNonInitializedTcpCommSpi() throws Exception {
+        ListeningTestLogger listeningLogger = new ListeningTestLogger(log);
+        LogListener npeLsnr = LogListener.matches("NullPointerException")
+            .andMatches("InboundConnectionHandler.onMessageSent").build();
+
+        listeningLogger.registerListener(npeLsnr);
+
+        GridTestNode sendingNode = new GridTestNode();
+        sendingNode.order(0);
+        GridSpiTestContext sendingCtx = initSpiContext();
+
+        TcpCommunicationSpi sendingSpi = initializeSpi(sendingCtx, sendingNode, listeningLogger, false);
+        spisToStop.add(sendingSpi);
+
+        sendingSpi.onContextInitialized(sendingCtx);
+
+        GridTestNode receiverNode = new GridTestNode();
+        receiverNode.order(1);
+        GridSpiTestContext receiverCtx = initSpiContext();
+
+        /*
+         * This is a dirty hack to intervene into TcpCommunicationSpi#onContextInitialized0 method
+         * and add a delay before injecting metrics listener into its clients (like InboundConnectionHandler).
+         * The purpose of the delay is to make race between sending a message and initializing TcpCommSpi visible.
+         *
+         * This solution heavily depends on current code structure of onContextInitialized0 method.
+         * If any modifications are made to it, this logic could break and the test starts failing.
+         *
+         * In that case try to rewrite the test or delete it as this race is really hard to test.
+         */
+        receiverCtx.metricsRegistryProducer((name) -> {
+            try {
+                Thread.sleep(100);
+            } catch (Exception ignored) {
+                // No-op.
+            }
+
+            return new MetricRegistry(name, null, null, new NullLogger());
+        });
+
+        TcpCommunicationSpi receiverSpi = initializeSpi(receiverCtx, receiverNode, listeningLogger, true);
+        spisToStop.add(receiverSpi);
+
+        receiverCtx.remoteNodes().add(sendingNode);
+        sendingCtx.remoteNodes().add(receiverNode);
+
+        IgniteInternalFuture sendFut = GridTestUtils.runAsync(() -> {
+            Message msg = new GridTestMessage(sendingNode.id(), 0, 0);
+
+            sendingSpi.sendMessage(receiverNode, msg);
+        });
+
+        IgniteInternalFuture initFut = GridTestUtils.runAsync(() -> {
+            try {
+                receiverSpi.onContextInitialized(receiverCtx);
+            } catch (Exception ignored) {
+                // No-op.
+            }
+        });
+
+        assertFalse("Check test logs, NPE was found",
+            GridTestUtils.waitForCondition(npeLsnr::check, 3_000));
+
+        initFut.get();
+        sendFut.get();
+    }
+
+    /**
+     * Initializes TcpCommunicationSpi with given context, node, logger and clientMode flag.
+     */
+    private TcpCommunicationSpi initializeSpi(GridSpiTestContext ctx,
+                                              GridTestNode node,
+                                              IgniteLogger log,
+                                              boolean clientMode) throws Exception {
+        TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+        spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+        spi.setIdleConnectionTimeout(2000);
+
+        IgniteConfiguration cfg = new IgniteConfiguration()
+            .setGridLogger(log)
+            .setClientMode(clientMode);
+
+        IgniteTestResources rsrcs = new IgniteTestResources(cfg);
+
+        resourcesToClean.add(rsrcs);
+
+        cfg.setMBeanServer(rsrcs.getMBeanServer());
+
+        node.setId(rsrcs.getNodeId());
+
+        MessageFactoryProvider testMsgFactory = factory -> factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+
+        ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory(), testMsgFactory}));
+
+        ctx.setLocalNode(node);
+
+        rsrcs.inject(spi);
+
+        spi.spiStart(getTestIgniteInstanceName() + node.order());
+
+        node.setAttributes(spi.getNodeAttributes());
+        node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", "));
+
+        return spi;
+    }
+
+    /**
      * Test checks that attribute {@link TcpCommunicationSpi#ATTR_HOST_NAMES}
      * is empty only if IP(don't wildcard and loopback) is set to {@link IgniteConfiguration#setLocalHost}
      * and property {@link IgniteSystemProperties#IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES} == {@code false}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index a15d7a0..bb65684 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -105,6 +106,9 @@ public class GridSpiTestContext implements IgniteSpiContext {
     /** */
     private GridTimeoutProcessor timeoutProcessor;
 
+    /** */
+    private volatile Function<String, ReadOnlyMetricRegistry> metricsRegistryProducer;
+
     /**
      * @param timeoutProcessor Timeout processor.
      */
@@ -112,6 +116,13 @@ public class GridSpiTestContext implements IgniteSpiContext {
         this.timeoutProcessor = timeoutProcessor;
     }
 
+    /**
+     * @param producer Producer to create {@link ReadOnlyMetricRegistry} objects.
+     */
+    public void metricsRegistryProducer(Function<String, ReadOnlyMetricRegistry> producer) {
+        this.metricsRegistryProducer = producer;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     @Override public Collection<ClusterNode> remoteNodes() {
@@ -618,6 +629,9 @@ public class GridSpiTestContext implements IgniteSpiContext {
 
     /** {@inheritDoc} */
     @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
+        if (metricsRegistryProducer != null)
+            return metricsRegistryProducer.apply(name);
+
         return new MetricRegistry(name, null, null, new NullLogger());
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
index f935e05e..b418d21 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
@@ -106,7 +106,11 @@ public class IgniteTestResources {
      */
     public IgniteTestResources(IgniteConfiguration cfg) {
         this.cfg = cfg;
-        this.log = rootLog.getLogger(getClass());
+
+        this.log = cfg.getGridLogger() != null
+            ? cfg.getGridLogger().getLogger(getClass())
+            : rootLog.getLogger(getClass());
+
         this.jmx = prepareMBeanServer();
         this.ctx = new GridTestKernalContext(log, this.cfg);
         this.rsrcProc = new GridResourceProcessor(ctx);