You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 14:38:14 UTC

ignite git commit: ignite-1524 Fixed processing of ClientReconnectMessage

Repository: ignite
Updated Branches:
  refs/heads/ignite-1524 [created] 919749cde


ignite-1524 Fixed processing of ClientReconnectMessage


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/919749cd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/919749cd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/919749cd

Branch: refs/heads/ignite-1524
Commit: 919749cde3e0c7e01c1a3b160807865a383ed634
Parents: 72c3eef
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 15:32:55 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 15:32:55 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 97 +++++++++++---------
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 33 +++++--
 2 files changed, 81 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/919749cd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 4ce46e8..8a205d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2958,70 +2958,81 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Client reconnect message.
          */
         private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+            UUID nodeId = msg.creatorNodeId();
+
             UUID locNodeId = getLocalNodeId();
 
             boolean isLocNodeRouter = locNodeId.equals(msg.routerNodeId());
 
             if (!msg.verified()) {
-                assert isLocNodeRouter;
-
-                msg.verify(locNodeId);
+                TcpDiscoveryNode node = ring.node(nodeId);
 
-                if (ring.hasRemoteNodes()) {
-                    sendMessageAcrossRing(msg);
+                assert node == null || node.isClient();
 
-                    return;
+                if (node != null) {
+                    node.clientRouterNodeId(msg.routerNodeId());
+                    node.aliveCheck(spi.maxMissedClientHbs);
                 }
-            }
-
-            UUID nodeId = msg.creatorNodeId();
 
-            TcpDiscoveryNode node = ring.node(nodeId);
+                if (isLocalNodeCoordinator()) {
+                    msg.verify(locNodeId);
 
-            assert node == null || node.isClient();
+                    if (node != null) {
+                        Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
 
-            if (node != null) {
-                assert node.isClient();
+                        if (pending != null) {
+                            msg.pendingMessages(pending);
+                            msg.success(true);
 
-                node.clientRouterNodeId(msg.routerNodeId());
-                node.aliveCheck(spi.maxMissedClientHbs);
+                            if (log.isDebugEnabled())
+                                log.debug("Accept client reconnect, restored pending messages " +
+                                    "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+                        }
+                        else {
+                            if (log.isDebugEnabled())
+                                log.debug("Failing reconnecting client node because failed to restore pending " +
+                                    "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
 
-                if (isLocalNodeCoordinator()) {
-                    Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
+                            processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
+                                node.id(), node.internalOrder()));
+                        }
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
 
-                    if (pending != null) {
-                        msg.pendingMessages(pending);
-                        msg.success(true);
+                    if (isLocNodeRouter) {
+                        ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Accept client reconnect, restored pending messages " +
-                                "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+                        if (wrk != null)
+                            wrk.addMessage(msg);
+                        else if (log.isDebugEnabled())
+                            log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+                                locNodeId + ", clientNodeId=" + nodeId + ']');
                     }
                     else {
-                        if (log.isDebugEnabled())
-                            log.debug("Failing reconnecting client node because failed to restore pending " +
-                                "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
-
-                        processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
-                            node.id(), node.internalOrder()));
+                        if (ring.hasRemoteNodes())
+                            sendMessageAcrossRing(msg);
                     }
                 }
-            }
-            else if (log.isDebugEnabled())
-                log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
-
-            if (isLocNodeRouter) {
-                ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
-                if (wrk != null)
-                    wrk.addMessage(msg);
-                else if (log.isDebugEnabled())
-                    log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
-                        locNodeId + ", clientNodeId=" + nodeId + ']');
+                else {
+                    if (ring.hasRemoteNodes())
+                        sendMessageAcrossRing(msg);
+                }
             }
             else {
-                if (ring.hasRemoteNodes())
-                    sendMessageAcrossRing(msg);
+                if (isLocNodeRouter) {
+                    ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+                    if (wrk != null)
+                        wrk.addMessage(msg);
+                    else if (log.isDebugEnabled())
+                        log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+                            locNodeId + ", clientNodeId=" + nodeId + ']');
+                }
+                else {
+                    if (ring.hasRemoteNodes() && !locNodeId.equals(msg.verifierNodeId()))
+                        sendMessageAcrossRing(msg);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/919749cd/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 14417c1..344efc0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -170,11 +172,26 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectOnCoordinatorRouterFail1() throws Exception {
+        clientReconnectOnCoordinatorRouterFail(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectOnCoordinatorRouterFail2() throws Exception {
+        clientReconnectOnCoordinatorRouterFail(2);
+    }
+
+    /**
      * Test tries to provoke scenario when client sends reconnect message before router failure detected.
      *
+     * @param srvNodes Number of additional server nodes.
      * @throws Exception If failed.
      */
-    public void _testClientReconnectOnCoordinatorRouterFail() throws Exception {
+    public void clientReconnectOnCoordinatorRouterFail(int srvNodes) throws Exception {
         startServerNodes(1);
 
         Ignite srv = G.ignite("server-0");
@@ -189,24 +206,28 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
             Collections.singleton("localhost:" + srvNode.discoveryPort() + ".." + (srvNode.discoveryPort() + 1)));
 
         failureThreshold = 1000L;
-        netTimeout = 500L;
+        netTimeout = 1000L;
 
         startClientNodes(1); // Client should connect to coordinator.
 
         failureThreshold = 10_000L;
         netTimeout = 5000L;
 
-        for (int i = 0; i < 2; i++) {
+        List<String> nodes = new ArrayList<>();
+
+        for (int i = 0; i < srvNodes; i++) {
             Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
 
+            nodes.add(g.name());
+
             srvNodeIds.add(g.cluster().localNode().id());
         }
 
-        checkNodes(3, 1);
+        checkNodes(1 + srvNodes, 1);
 
-        final CountDownLatch latch = new CountDownLatch(3);
+        nodes.add("client-0");
 
-        String nodes[] = {"server-1", "server-2", "client-0"};
+        final CountDownLatch latch = new CountDownLatch(nodes.size());
 
         final AtomicBoolean err = new AtomicBoolean();