You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/01 13:02:56 UTC
[ignite] branch master updated: IGNITE-11454 Fixed race in
ClientImpl leading to client segmentation - Fixes #6204.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5ed019c IGNITE-11454 Fixed race in ClientImpl leading to client segmentation - Fixes #6204.
5ed019c is described below
commit 5ed019c9a30d0bfeb141453fec81224858f22ee9
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Fri Mar 1 15:53:08 2019 +0300
IGNITE-11454 Fixed race in ClientImpl leading to client segmentation - Fixes #6204.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../ignite/spi/discovery/tcp/ClientImpl.java | 50 ++++++++++++++--------
.../ClientReconnectAfterClusterRestartTest.java | 39 ++++++++++++++++-
2 files changed, 69 insertions(+), 20 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 196b9ad..af2f75c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -140,9 +140,6 @@ import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.STOPPED;
*/
class ClientImpl extends TcpDiscoveryImpl {
/** */
- private static final Object JOIN_TIMEOUT = "JOIN_TIMEOUT";
-
- /** */
private static final Object SPI_STOP = "SPI_STOP";
/** */
@@ -1694,22 +1691,26 @@ class ClientImpl extends TcpDiscoveryImpl {
blockingSectionEnd();
}
- if (msg == JOIN_TIMEOUT) {
- if (state == STARTING) {
- joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
- "join request (consider increasing 'joinTimeout' configuration property) " +
- "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
+ if (msg instanceof JoinTimeout) {
+ int joinCnt0 = ((JoinTimeout)msg).joinCnt;
- break;
- }
- else if (state == DISCONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Failed to reconnect, local node segmented " +
- "[joinTimeout=" + spi.joinTimeout + ']');
+ if (joinCnt == joinCnt0) {
+ if (state == STARTING) {
+ joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
+ "join request (consider increasing 'joinTimeout' configuration property) " +
+ "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
+
+ break;
+ }
+ else if (state == DISCONNECTED) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to reconnect, local node segmented " +
+ "[joinTimeout=" + spi.joinTimeout + ']');
- state = SEGMENTED;
+ state = SEGMENTED;
- notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
}
}
else if (msg == SPI_STOP) {
@@ -2055,8 +2056,7 @@ class ClientImpl extends TcpDiscoveryImpl {
timer.schedule(new TimerTask() {
@Override public void run() {
- if (joinCnt == joinCnt0 && joining())
- queue.add(JOIN_TIMEOUT);
+ queue.add(new JoinTimeout(joinCnt0));
}
}, spi.joinTimeout);
}
@@ -2631,6 +2631,20 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
+ */
+ private static class JoinTimeout {
+ /** */
+ private int joinCnt;
+
+ /**
+ * @param joinCnt Join count to compare.
+ */
+ private JoinTimeout(int joinCnt) {
+ this.joinCnt = joinCnt;
+ }
+ }
+
+ /**
*
*/
private static class SocketClosedMessage {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
index 7b27797..50d1170 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
@@ -53,6 +54,9 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe
/** Cache params. */
private static final String CACHE_PARAMS = "PPRB_PARAMS";
+ /** */
+ private int joinTimeout;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -68,6 +72,9 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe
cfg.setCacheConfiguration(ccfg);
}
+ if (joinTimeout != 0 && getTestIgniteInstanceName(1).equals(igniteInstanceName))
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(joinTimeout);
+
return cfg;
}
@@ -113,9 +120,36 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe
return ccfg;
}
- /** */
+ /**
+ * @throws Exception if failed.
+ */
@Test
public void testReconnectClient() throws Exception {
+ checkReconnectClient();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testReconnectClient10sTimeout() throws Exception {
+ joinTimeout = 10_000;
+
+ checkReconnectClient();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testReconnectClient2sTimeout() throws Exception {
+ joinTimeout = 2_000;
+
+ checkReconnectClient();
+ }
+
+ /** */
+ public void checkReconnectClient() throws Exception {
try {
startGrid(SERVER_ID);
@@ -169,7 +203,8 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe
try {
assertNull(cache.get(1L));
- } catch (CacheException ce) {
+ }
+ catch (CacheException ce) {
IgniteClientDisconnectedException icde = (IgniteClientDisconnectedException)ce.getCause();
icde.reconnectFuture().get();