You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/01/23 15:30:35 UTC
[ignite] branch master updated: IGNITE-10933 Fixed node join hang
when joining node receives client reconnect message - Fixes #5852.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 20cf75d IGNITE-10933 Fixed node join hang when joining node receives client reconnect message - Fixes #5852.
20cf75d is described below
commit 20cf75db8d674b0fcc82407e6264a924e4250444
Author: Aleksei Scherbakov <al...@gmail.com>
AuthorDate: Wed Jan 23 18:29:29 2019 +0300
IGNITE-10933 Fixed node join hang when joining node receives client reconnect message - Fixes #5852.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../IgniteDiscoverySpiInternalListener.java | 8 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 17 +-
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 10 +-
.../ignite/internal/DiscoverySpiTestListener.java | 33 +++
.../TcpDiscoveryReconnectUnstableTopologyTest.java | 224 +++++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 5 +-
6 files changed, 292 insertions(+), 5 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
index 24405f8..8016423 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
@@ -33,6 +33,14 @@ public interface IgniteDiscoverySpiInternalListener {
public void beforeJoin(ClusterNode locNode, IgniteLogger log);
/**
+ * @param locNode Local node.
+ * @param log Logger.
+ */
+ default public void beforeReconnect(ClusterNode locNode, IgniteLogger log) {
+ // No-op.
+ }
+
+ /**
* @param spi SPI instance.
* @param log Logger.
* @param msg Custom message.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 879c8b8..49a4f55 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3245,12 +3245,14 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Pending messages will be sent [failure=" + failure +
", newNextNode=" + newNextNode +
- ", forceSndPending=" + forceSndPending + ']');
+ ", forceSndPending=" + forceSndPending +
+ ", failedNodes=" + failedNodes + ']');
if (debugMode)
debugLog(msg, "Pending messages will be sent [failure=" + failure +
", newNextNode=" + newNextNode +
- ", forceSndPending=" + forceSndPending + ']');
+ ", forceSndPending=" + forceSndPending +
+ ", failedNodes=" + failedNodes + ']');
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
long tstamp = U.currentTimeMillis();
@@ -6358,6 +6360,17 @@ class ServerImpl extends TcpDiscoveryImpl {
continue;
}
else {
+ TcpDiscoveryClientReconnectMessage msg0 = (TcpDiscoveryClientReconnectMessage)msg;
+
+ // If message is received from previous node and node is connecting forward to next node.
+ if (!getLocalNodeId().equals(msg0.routerNodeId()) && state == CONNECTING) {
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+
+ msgWorker.addMessage(msg);
+
+ continue;
+ }
+
spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
break;
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index bee7a84..c87e00d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -104,6 +104,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
@@ -1648,8 +1649,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
OutputStream out,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
- if (internalLsnr != null && msg instanceof TcpDiscoveryJoinRequestMessage)
- internalLsnr.beforeJoin(locNode, log);
+ if (internalLsnr != null) {
+ if (msg instanceof TcpDiscoveryJoinRequestMessage)
+ internalLsnr.beforeJoin(locNode, log);
+
+ if (msg instanceof TcpDiscoveryClientReconnectMessage)
+ internalLsnr.beforeReconnect(locNode, log);
+ }
assert sock != null;
assert msg != null;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
index 46d9edc..051327a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
@@ -43,6 +43,9 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe
private volatile CountDownLatch joinLatch;
/** */
+ private volatile CountDownLatch reconLatch;
+
+ /** */
private Set<Class<?>> blockCustomEvtCls;
/** */
@@ -67,10 +70,24 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe
/**
*
*/
+ public void startBlockReconnect() {
+ reconLatch = new CountDownLatch(1);
+ }
+
+ /**
+ *
+ */
public void stopBlockJoin() {
joinLatch.countDown();
}
+ /**
+ *
+ */
+ public void stopBlockRestart() {
+ reconLatch.countDown();
+ }
+
/** {@inheritDoc} */
@Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) {
try {
@@ -88,6 +105,22 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe
}
/** {@inheritDoc} */
+ @Override public void beforeReconnect(ClusterNode locNode, IgniteLogger log) {
+ try {
+ CountDownLatch writeLatch0 = reconLatch;
+
+ if (writeLatch0 != null) {
+ log.info("Block reconnect");
+
+ U.await(writeLatch0);
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) {
this.spi = spi;
this.log = log;
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryReconnectUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryReconnectUnstableTopologyTest.java
new file mode 100644
index 0000000..55f745a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryReconnectUnstableTopologyTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.DiscoverySpiTestListener;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test scenario:
+ *
+ * 1. Create topology in specific order: srv1 srv2 client srv3 srv4
+ * 2. Delay client reconnect.
+ * 3. Trigger topology change by restarting srv2 (will trigger reconnect to next node), srv3, srv4
+ * 4. Resume reconnect to node with empty EnsuredMessageHistory and wait for completion.
+ * 5. Add new node to topology.
+ *
+ * Pass condition: new node successfully joins topology.
+ */
+@RunWith(JUnit4.class)
+public class TcpDiscoveryReconnectUnstableTopologyTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ cfg.setAutoActivationEnabled(false);
+
+ BlockTcpDiscoverySpi spi = new BlockTcpDiscoverySpi();
+
+ // Guarantees client join to srv2.
+ Field rndAddrsField = U.findField(BlockTcpDiscoverySpi.class, "skipAddrsRandomization");
+
+ assertNotNull(rndAddrsField);
+
+ rndAddrsField.set(spi, true);
+
+ cfg.setDiscoverySpi(spi.setIpFinder(ipFinder));
+
+ cfg.setClientMode(igniteInstanceName.startsWith("client"));
+
+ cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME));
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReconnectUnstableTopology() throws Exception {
+ try {
+ List<IgniteEx> nodes = new ArrayList<>();
+
+ nodes.add(startGrid(0));
+
+ nodes.add(startGrid(1));
+
+ nodes.add(startGrid("client"));
+
+ nodes.add(startGrid(2));
+
+ nodes.add(startGrid(3));
+
+ for (int i = 0; i < nodes.size(); i++) {
+ IgniteEx ex = nodes.get(i);
+
+ assertEquals(i + 1, ex.localNode().order());
+ }
+
+ DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+ spi(grid("client")).setInternalListener(lsnr);
+
+ lsnr.startBlockReconnect();
+
+ CountDownLatch restartLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
+ stopGrid(1);
+ stopGrid(2);
+ stopGrid(3);
+ try {
+ startGrid(1);
+ startGrid(2);
+ startGrid(3);
+ }
+ catch (Exception e) {
+ fail();
+ }
+
+ restartLatch.countDown();
+ }, 1, "restarter");
+
+ U.awaitQuiet(restartLatch);
+
+ lsnr.stopBlockRestart();
+
+ fut.get();
+
+ doSleep(1500); // Wait for reconnect.
+
+ startGrid(4);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param ig Ignite.
+ */
+ private TcpDiscoverySpi spi(Ignite ig) {
+ return (TcpDiscoverySpi)ig.configuration().getDiscoverySpi();
+ }
+
+ /**
+ * Discovery SPI with blocking support.
+ */
+ protected class BlockTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** Closure. */
+ private volatile IgniteBiClosure<ClusterNode, DiscoveryCustomMessage, Void> clo;
+
+ /**
+ * @param clo Closure.
+ */
+ public void setClosure(IgniteBiClosure<ClusterNode, DiscoveryCustomMessage, Void> clo) {
+ this.clo = clo;
+ }
+
+ /**
+ * @param addr Address.
+ * @param msg Message.
+ */
+ private synchronized void apply(ClusterNode addr, TcpDiscoveryAbstractMessage msg) {
+ if (!(msg instanceof TcpDiscoveryCustomEventMessage))
+ return;
+
+ TcpDiscoveryCustomEventMessage cm = (TcpDiscoveryCustomEventMessage)msg;
+
+ DiscoveryCustomMessage delegate;
+
+ try {
+ DiscoverySpiCustomMessage custMsg = cm.message(marshaller(), U.resolveClassLoader(ignite().configuration()));
+
+ assertNotNull(custMsg);
+
+ delegate = ((CustomMessageWrapper)custMsg).delegate();
+
+ }
+ catch (Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+
+ if (clo != null)
+ clo.apply(addr, delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(
+ Socket sock,
+ TcpDiscoveryAbstractMessage msg,
+ byte[] data,
+ long timeout
+ ) throws IOException {
+ if (spiCtx != null)
+ apply(spiCtx.localNode(), msg);
+
+ super.writeToSocket(sock, msg, data, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock,
+ OutputStream out,
+ TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (spiCtx != null)
+ apply(spiCtx.localNode(), msg);
+
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 7da76e7..18776b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -40,6 +40,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeAttributesUpdateOnRec
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConfigConsistentIdSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConsistentIdSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryPendingMessageDeliveryTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryReconnectUnstableTopologyTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryRestartTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySegmentationPolicyTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySelfTest;
@@ -137,7 +138,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
FilterDataForClientNodeDiscoveryTest.class,
- TcpDiscoveryPendingMessageDeliveryTest.class
+ TcpDiscoveryPendingMessageDeliveryTest.class,
+
+ TcpDiscoveryReconnectUnstableTopologyTest.class
})
public class IgniteSpiDiscoverySelfTestSuite {
/** */