You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/02/05 07:04:32 UTC
[ignite] branch master updated: IGNITE-12982 Initialization of
TcpCommunicationSpi fix: no messages are sent until SPI is fully
initialized. - Fixes #8717.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 52a1749 IGNITE-12982 Initialization of TcpCommunicationSpi fix: no messages are sent until SPI is fully initialized. - Fixes #8717.
52a1749 is described below
commit 52a1749e936efa612a2d0740de5b7f36ffb8a13c
Author: Sergey Chugunov <se...@gmail.com>
AuthorDate: Fri Feb 5 10:00:04 2021 +0300
IGNITE-12982 Initialization of TcpCommunicationSpi fix: no messages are sent until SPI is fully initialized. - Fixes #8717.
Signed-off-by: Ivan Bessonov <be...@gmail.com>
---
.../spi/communication/tcp/TcpCommunicationSpi.java | 4 +-
.../tcp/GridTcpCommunicationSpiConfigSelfTest.java | 158 +++++++++++++++++++++
.../ignite/testframework/GridSpiTestContext.java | 14 ++
.../testframework/junits/IgniteTestResources.java | 6 +-
4 files changed, 179 insertions(+), 3 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1012612..1858abc 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -940,8 +940,6 @@ public class TcpCommunicationSpi extends TcpCommunicationConfigInitializer {
spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
- ctxInitLatch.countDown();
-
metricsLsnr = new TcpCommunicationMetricsListener(ignite, spiCtx);
registerMBean(igniteInstanceName, new TcpCommunicationSpiMBeanImpl(this, metricsLsnr, cfg, stateProvider), TcpCommunicationSpiMBean.class);
@@ -952,6 +950,8 @@ public class TcpCommunicationSpi extends TcpCommunicationConfigInitializer {
if (shmemAcceptWorker != null)
shmemAcceptWorker.metricsListener(metricsLsnr);
+
+ ctxInitLatch.countDown();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index ffc0ddc..29e0e4d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -19,14 +19,34 @@ package org.apache.ignite.spi.communication.tcp;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collection;
+
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.GridTestMessage;
+import org.apache.ignite.testframework.GridSpiTestContext;
+import org.apache.ignite.testframework.GridTestNode;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractConfigTest;
import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -36,6 +56,7 @@ import static java.util.Objects.isNull;
import static java.util.Objects.requireNonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
import static org.apache.ignite.internal.util.IgniteUtils.spiAttribute;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.ATTR_HOST_NAMES;
import static org.apache.ignite.testframework.GridTestUtils.getFreeCommPort;
@@ -51,10 +72,31 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig
*/
private String locHost = "0.0.0.0";
+ /** */
+ private final Collection<IgniteTestResources> resourcesToClean = new ArrayList<>();
+
+ /** */
+ private final Collection<CommunicationSpi> spisToStop = new ArrayList<>();
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
+ for (IgniteTestResources itr : resourcesToClean) {
+ itr.stopThreads();
+ }
+
+ for (CommunicationSpi commSpi : spisToStop) {
+ commSpi.onContextDestroyed();
+
+ commSpi.setListener(null);
+
+ commSpi.spiStop();
+ }
+
+ resourcesToClean.clear();
+ spisToStop.clear();
+
super.afterTest();
}
@@ -106,6 +148,122 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig
}
/**
+ * Verifies that TcpCommunicationSpi starts messaging protocol only when fully initialized.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/IGNITE-12982">IGNITE-12982</a>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = "IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", value = "true")
+ public void testSendToNonInitializedTcpCommSpi() throws Exception {
+ ListeningTestLogger listeningLogger = new ListeningTestLogger(log);
+ LogListener npeLsnr = LogListener.matches("NullPointerException")
+ .andMatches("InboundConnectionHandler.onMessageSent").build();
+
+ listeningLogger.registerListener(npeLsnr);
+
+ GridTestNode sendingNode = new GridTestNode();
+ sendingNode.order(0);
+ GridSpiTestContext sendingCtx = initSpiContext();
+
+ TcpCommunicationSpi sendingSpi = initializeSpi(sendingCtx, sendingNode, listeningLogger, false);
+ spisToStop.add(sendingSpi);
+
+ sendingSpi.onContextInitialized(sendingCtx);
+
+ GridTestNode receiverNode = new GridTestNode();
+ receiverNode.order(1);
+ GridSpiTestContext receiverCtx = initSpiContext();
+
+ /*
+ * This is a dirty hack to intervene into TcpCommunicationSpi#onContextInitialized0 method
+ * and add a delay before injecting metrics listener into its clients (like InboundConnectionHandler).
+ * The purpose of the delay is to make race between sending a message and initializing TcpCommSpi visible.
+ *
+ * This solution heavily depends on current code structure of onContextInitialized0 method.
+ * If any modifications are made to it, this logic could break and the test starts failing.
+ *
+ * In that case try to rewrite the test or delete it as this race is really hard to test.
+ */
+ receiverCtx.metricsRegistryProducer((name) -> {
+ try {
+ Thread.sleep(100);
+ } catch (Exception ignored) {
+ // No-op.
+ }
+
+ return new MetricRegistry(name, null, null, new NullLogger());
+ });
+
+ TcpCommunicationSpi receiverSpi = initializeSpi(receiverCtx, receiverNode, listeningLogger, true);
+ spisToStop.add(receiverSpi);
+
+ receiverCtx.remoteNodes().add(sendingNode);
+ sendingCtx.remoteNodes().add(receiverNode);
+
+ IgniteInternalFuture sendFut = GridTestUtils.runAsync(() -> {
+ Message msg = new GridTestMessage(sendingNode.id(), 0, 0);
+
+ sendingSpi.sendMessage(receiverNode, msg);
+ });
+
+ IgniteInternalFuture initFut = GridTestUtils.runAsync(() -> {
+ try {
+ receiverSpi.onContextInitialized(receiverCtx);
+ } catch (Exception ignored) {
+ // No-op.
+ }
+ });
+
+ assertFalse("Check test logs, NPE was found",
+ GridTestUtils.waitForCondition(npeLsnr::check, 3_000));
+
+ initFut.get();
+ sendFut.get();
+ }
+
+ /**
+ * Initializes TcpCommunicationSpi with given context, node, logger and clientMode flag.
+ */
+ private TcpCommunicationSpi initializeSpi(GridSpiTestContext ctx,
+ GridTestNode node,
+ IgniteLogger log,
+ boolean clientMode) throws Exception {
+ TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+ spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+ spi.setIdleConnectionTimeout(2000);
+
+ IgniteConfiguration cfg = new IgniteConfiguration()
+ .setGridLogger(log)
+ .setClientMode(clientMode);
+
+ IgniteTestResources rsrcs = new IgniteTestResources(cfg);
+
+ resourcesToClean.add(rsrcs);
+
+ cfg.setMBeanServer(rsrcs.getMBeanServer());
+
+ node.setId(rsrcs.getNodeId());
+
+ MessageFactoryProvider testMsgFactory = factory -> factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+
+ ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory(), testMsgFactory}));
+
+ ctx.setLocalNode(node);
+
+ rsrcs.inject(spi);
+
+ spi.spiStart(getTestIgniteInstanceName() + node.order());
+
+ node.setAttributes(spi.getNodeAttributes());
+ node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", "));
+
+ return spi;
+ }
+
+ /**
* Test checks that attribute {@link TcpCommunicationSpi#ATTR_HOST_NAMES}
* is empty only if IP(don't wildcard and loopback) is set to {@link IgniteConfiguration#setLocalHost}
* and property {@link IgniteSystemProperties#IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES} == {@code false}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index a15d7a0..bb65684 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -105,6 +106,9 @@ public class GridSpiTestContext implements IgniteSpiContext {
/** */
private GridTimeoutProcessor timeoutProcessor;
+ /** */
+ private volatile Function<String, ReadOnlyMetricRegistry> metricsRegistryProducer;
+
/**
* @param timeoutProcessor Timeout processor.
*/
@@ -112,6 +116,13 @@ public class GridSpiTestContext implements IgniteSpiContext {
this.timeoutProcessor = timeoutProcessor;
}
+ /**
+ * @param producer Producer to create {@link ReadOnlyMetricRegistry} objects.
+ */
+ public void metricsRegistryProducer(Function<String, ReadOnlyMetricRegistry> producer) {
+ this.metricsRegistryProducer = producer;
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
@Override public Collection<ClusterNode> remoteNodes() {
@@ -618,6 +629,9 @@ public class GridSpiTestContext implements IgniteSpiContext {
/** {@inheritDoc} */
@Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
+ if (metricsRegistryProducer != null)
+ return metricsRegistryProducer.apply(name);
+
return new MetricRegistry(name, null, null, new NullLogger());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
index f935e05e..b418d21 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
@@ -106,7 +106,11 @@ public class IgniteTestResources {
*/
public IgniteTestResources(IgniteConfiguration cfg) {
this.cfg = cfg;
- this.log = rootLog.getLogger(getClass());
+
+ this.log = cfg.getGridLogger() != null
+ ? cfg.getGridLogger().getLogger(getClass())
+ : rootLog.getLogger(getClass());
+
this.jmx = prepareMBeanServer();
this.ctx = new GridTestKernalContext(log, this.cfg);
this.rsrcProc = new GridResourceProcessor(ctx);