You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/01 13:02:56 UTC

[ignite] branch master updated: IGNITE-11454 Fixed race in ClientImpl leading to client segmentation - Fixes #6204.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ed019c  IGNITE-11454 Fixed race in ClientImpl leading to client segmentation - Fixes #6204.
5ed019c is described below

commit 5ed019c9a30d0bfeb141453fec81224858f22ee9
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Fri Mar 1 15:53:08 2019 +0300

    IGNITE-11454 Fixed race in ClientImpl leading to client segmentation - Fixes #6204.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../ignite/spi/discovery/tcp/ClientImpl.java       | 50 ++++++++++++++--------
 .../ClientReconnectAfterClusterRestartTest.java    | 39 ++++++++++++++++-
 2 files changed, 69 insertions(+), 20 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 196b9ad..af2f75c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -140,9 +140,6 @@ import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.STOPPED;
  */
 class ClientImpl extends TcpDiscoveryImpl {
     /** */
-    private static final Object JOIN_TIMEOUT = "JOIN_TIMEOUT";
-
-    /** */
     private static final Object SPI_STOP = "SPI_STOP";
 
     /** */
@@ -1694,22 +1691,26 @@ class ClientImpl extends TcpDiscoveryImpl {
                         blockingSectionEnd();
                     }
 
-                    if (msg == JOIN_TIMEOUT) {
-                        if (state == STARTING) {
-                            joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
-                                "join request (consider increasing 'joinTimeout' configuration property) " +
-                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
+                    if (msg instanceof JoinTimeout) {
+                        int joinCnt0 = ((JoinTimeout)msg).joinCnt;
 
-                            break;
-                        }
-                        else if (state == DISCONNECTED) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to reconnect, local node segmented " +
-                                    "[joinTimeout=" + spi.joinTimeout + ']');
+                        if (joinCnt == joinCnt0) {
+                            if (state == STARTING) {
+                                joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
+                                    "join request (consider increasing 'joinTimeout' configuration property) " +
+                                    "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
+
+                                break;
+                            }
+                            else if (state == DISCONNECTED) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to reconnect, local node segmented " +
+                                        "[joinTimeout=" + spi.joinTimeout + ']');
 
-                            state = SEGMENTED;
+                                state = SEGMENTED;
 
-                            notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                                notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                            }
                         }
                     }
                     else if (msg == SPI_STOP) {
@@ -2055,8 +2056,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 timer.schedule(new TimerTask() {
                     @Override public void run() {
-                        if (joinCnt == joinCnt0 && joining())
-                            queue.add(JOIN_TIMEOUT);
+                        queue.add(new JoinTimeout(joinCnt0));
                     }
                 }, spi.joinTimeout);
             }
@@ -2631,6 +2631,20 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     */
+    private static class JoinTimeout {
+        /** */
+        private int joinCnt;
+
+        /**
+         * @param joinCnt Join count to compare.
+         */
+        private JoinTimeout(int joinCnt) {
+            this.joinCnt = joinCnt;
+        }
+    }
+
+    /**
      *
      */
     private static class SocketClosedMessage {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
index 7b27797..50d1170 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
@@ -53,6 +54,9 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe
     /** Cache params. */
     private static final String CACHE_PARAMS = "PPRB_PARAMS";
 
+    /** */
+    private int joinTimeout;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -68,6 +72,9 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe
             cfg.setCacheConfiguration(ccfg);
         }
 
+        if (joinTimeout != 0 && getTestIgniteInstanceName(1).equals(igniteInstanceName))
+            ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(joinTimeout);
+
         return cfg;
     }
 
@@ -113,9 +120,36 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe
         return ccfg;
     }
 
-    /** */
+    /**
+     * @throws Exception if failed.
+     */
     @Test
     public void testReconnectClient() throws Exception {
+        checkReconnectClient();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testReconnectClient10sTimeout() throws Exception {
+        joinTimeout = 10_000;
+
+        checkReconnectClient();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testReconnectClient2sTimeout() throws Exception {
+        joinTimeout = 2_000;
+
+        checkReconnectClient();
+    }
+
+    /** */
+    public void checkReconnectClient() throws Exception {
         try {
             startGrid(SERVER_ID);
 
@@ -169,7 +203,8 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe
 
             try {
                 assertNull(cache.get(1L));
-            } catch (CacheException ce) {
+            }
+            catch (CacheException ce) {
                 IgniteClientDisconnectedException icde = (IgniteClientDisconnectedException)ce.getCause();
 
                 icde.reconnectFuture().get();