You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/25 15:08:56 UTC

incubator-ignite git commit: # race with client worker start, handle missing 'addfinished' message during initial client connection

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-yardstick-client-2 2d74b92b0 -> b971c3f6e


# race with client worker start, handle missing 'addfinished' message during initial client connection


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b971c3f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b971c3f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b971c3f6

Branch: refs/heads/ignite-yardstick-client-2
Commit: b971c3f6e33d75b3c58bcea1362b34b03b078547
Parents: 2d74b92
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 25 15:57:30 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 25 16:04:32 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  44 +++----
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  81 +++++++++++--
 .../distributed/IgniteCacheManyClientsTest.java |  13 ++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 116 ++++++++++++++++++-
 4 files changed, 218 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b971c3f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 397038b..7f11aad 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -753,8 +753,6 @@ class ClientImpl extends TcpDiscoveryImpl {
                     rmtNodeId = this.rmtNodeId;
                 }
 
-                boolean first = joinLatch.getCount() > 0;
-
                 try {
                     InputStream in = new BufferedInputStream(sock.getInputStream());
 
@@ -765,17 +763,12 @@ class ClientImpl extends TcpDiscoveryImpl {
                         TcpDiscoveryAbstractMessage msg;
 
                         try {
-                            if (first)
-                                msg = spi.readMessage(sock, in, spi.netTimeout);
-                            else
-                                msg = spi.marsh.unmarshal(in, U.gridClassLoader());
+                            msg = spi.marsh.unmarshal(in, U.gridClassLoader());
                         }
                         catch (IgniteCheckedException e) {
                             //if (log.isDebugEnabled())
                                 U.error(log, "Failed to read message [sock=" + sock + ", " +
-                                    "locNodeId=" + getLocalNodeId() +
-                                    ", rmtNodeId=" + rmtNodeId +
-                                    ", first=" + first + ']', e);
+                                    "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e);
 
                             IOException ioEx = X.cause(e, IOException.class);
 
@@ -794,9 +787,6 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                             continue;
                         }
-                        finally {
-                            first = false;
-                        }
 
                         msg.senderNodeId(rmtNodeId);
 
@@ -816,13 +806,11 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     //if (log.isDebugEnabled())
                         U.error(log, "Connection failed [sock=" + sock +
-                            ", locNodeId=" + getLocalNodeId() +
-                            ", first=" + first + ']', e);
+                            ", locNodeId=" + getLocalNodeId() + ']', e);
                 }
                 finally {
                     U.error(log, "Closing socket [sock=" + sock +
-                        ", locNodeId=" + getLocalNodeId() +
-                        ", first="+ first + ']');
+                        ", locNodeId=" + getLocalNodeId() + ']');
 
                     U.closeQuiet(sock);
 
@@ -1546,9 +1534,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                 return;
 
             if (getLocalNodeId().equals(msg.creatorNodeId())) {
-                assert msg.success() : msg;
-
                 if (reconnector != null) {
+                    assert msg.success() : msg;
+
                     currSock = reconnector.sock;
 
                     sockWriter.setSocket(currSock);
@@ -1561,7 +1549,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     try {
                         for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
                             if (log.isDebugEnabled())
-                                log.debug("Process message on reconnect [msg=" + pendingMsg + ']');
+                                log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']');
 
                             processDiscoveryMessage(pendingMsg);
                         }
@@ -1570,8 +1558,22 @@ class ClientImpl extends TcpDiscoveryImpl {
                         pending = false;
                     }
                 }
-                else if (log.isDebugEnabled())
-                    log.debug("Discarding reconnect message, reconnect is completed: " + msg);
+                else {
+                    if (joinLatch.getCount() > 0) {
+                        if (msg.success()) {
+                            for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Process pending message on connect [msg=" + pendingMsg + ']');
+
+                                processDiscoveryMessage(pendingMsg);
+                            }
+
+                            assert joinLatch.getCount() == 0 : msg;
+                        }
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Discarding reconnect message, reconnect is completed: " + msg);
+                }
             }
             else if (log.isDebugEnabled())
                 log.debug("Discarding reconnect message for another client: " + msg);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b971c3f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index eb58b1b..9c6c3c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2466,7 +2466,33 @@ class ServerImpl extends TcpDiscoveryImpl {
                         return;
                     }
 
-                    if (log.isDebugEnabled())
+                    if (msg.client()) {
+                        TcpDiscoveryClientReconnectMessage reconMsg = new TcpDiscoveryClientReconnectMessage(node.id(),
+                            node.clientRouterNodeId(),
+                            null);
+
+                        reconMsg.verify(getLocalNodeId());
+
+                        Collection<TcpDiscoveryAbstractMessage> msgs = msgHist.messages(null, node);
+
+                        if (msgs != null) {
+                            reconMsg.pendingMessages(msgs);
+
+                            reconMsg.success(true);
+                        }
+
+                        if (getLocalNodeId().equals(node.clientRouterNodeId())) {
+                            ClientMessageWorker wrk = clientMsgWorkers.get(node.id());
+
+                            if (wrk != null)
+                                wrk.addMessage(reconMsg);
+                        }
+                        else {
+                            if (ring.hasRemoteNodes())
+                                sendMessageAcrossRing(reconMsg);
+                        }
+                    }
+                    else if (log.isDebugEnabled())
                         log.debug("Ignoring join request message since node is already in topology: " + msg);
 
                     return;
@@ -4164,7 +4190,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         clientMsgWrk = clientMsgWrk0;
 
-                        clientMsgWrk.start();
+                        //clientMsgWrk.start();
                     }
 
                     if (log.isDebugEnabled())
@@ -4222,14 +4248,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
+                boolean first = false;
+
                 while (!isInterrupted()) {
                     try {
                         TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
 
                         msg.senderNodeId(nodeId);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Message has been received: " + msg);
+                        if (first)
+                            log.info("Message has been received [node=" + nodeId + " , msg=" + msg + ']');
 
                         spi.stats.onMessageReceived(msg);
 
@@ -4240,7 +4268,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
 
                             if (!req.responded()) {
-                                boolean ok = processJoinRequestMessage(req);
+                                boolean ok = processJoinRequestMessage(req, clientMsgWrk);
 
                                 if (clientMsgWrk != null && ok)
                                     continue;
@@ -4256,6 +4284,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (state == CONNECTED) {
                                     spi.writeToSocket(sock, RES_OK);
 
+                                    log.info("Responded to reconnect message [msg=" + msg +
+                                        ", reconNode=" + msg.creatorNodeId() +
+                                        ", res=" + RES_OK + ']');
+
+                                    if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+                                        clientMsgWrk.start();
+
                                     msgWorker.addMessage(msg);
 
                                     continue;
@@ -4263,6 +4298,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 else {
                                     spi.writeToSocket(sock, RES_CONTINUE_JOIN);
 
+                                    log.info("Responded to reconnect message [msg=" + msg +
+                                        ", reconNode=" + msg.creatorNodeId() +
+                                        ", res=" + RES_CONTINUE_JOIN + ']');
+
                                     break;
                                 }
                             }
@@ -4401,7 +4440,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             spi.writeToSocket(sock, RES_OK);
                     }
                     catch (IgniteCheckedException e) {
-                        if (log.isDebugEnabled())
+                        //if (log.isDebugEnabled())
                             U.error(log, "Caught exception on message read [sock=" + sock +
                                 ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e);
 
@@ -4428,7 +4467,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         return;
                     }
                     catch (IOException e) {
-                        if (log.isDebugEnabled())
+                        //if (log.isDebugEnabled())
                             U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId +
                                 ", rmtNodeId=" + nodeId + ']', e);
 
@@ -4491,7 +4530,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @throws IOException If IO failed.
          */
         @SuppressWarnings({"IfMayBeConditional"})
-        private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg) throws IOException {
+        private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg,
+            @Nullable ClientMessageWorker clientMsgWrk) throws IOException {
             assert msg != null;
             assert !msg.responded();
 
@@ -4500,11 +4540,16 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (state == CONNECTED) {
                 spi.writeToSocket(sock, RES_OK);
 
-                if (log.isDebugEnabled())
-                    log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
+                //if (log.isDebugEnabled())
+                    log.info("Responded to join request message [msg=" + msg +
+                        ", joinNode=" + msg.creatorNodeId() +
+                        ", res=" + RES_OK + ']');
 
                 msg.responded(true);
 
+                if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+                    clientMsgWrk.start();
+
                 msgWorker.addMessage(msg);
 
                 return true;
@@ -4531,8 +4576,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 spi.writeToSocket(sock, res);
 
-                if (log.isDebugEnabled())
-                    log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
+                //if (log.isDebugEnabled())
+                    log.info("Responded to join request message [msg=" + msg +
+                        ", joinNode=" + msg.creatorNodeId() +
+                        ", res=" + res + ']');
 
                 fromAddrs.addAll(msg.node().socketAddresses());
 
@@ -4614,6 +4661,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** */
         private final AtomicReference<GridFutureAdapter<Boolean>> pingFut = new AtomicReference<>();
 
+        /** */
+        private boolean first = true;
+
         /**
          * @param sock Socket.
          * @param clientNodeId Node ID.
@@ -4649,6 +4699,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                         + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 
                 try {
+                    if (first) {
+                        log.info("Redirecting message to client [sock=" + sock + ", locNodeId="
+                            + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+
+                        first = false;
+                    }
+
                     prepareNodeAddedMessage(msg, clientNodeId, null, null);
 
                     writeToSocket(sock, msg);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b971c3f6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index c3223a2..e6eb102 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -54,10 +54,15 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
     /** */
     private boolean clientDiscovery;
 
+    /** */
+    private static ThreadLocal<UUID> id = new ThreadLocal<>();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        cfg.setNodeId(id.get());
+
         cfg.setConnectorConfiguration(null);
         cfg.setPeerClassLoadingEnabled(false);
         cfg.setTimeServerPortRange(200);
@@ -65,7 +70,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200);
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(2 * 60_000);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(5 * 60_000);
 
         if (!clientDiscovery)
             ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
@@ -217,7 +222,11 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
                     try {
                         int nodeIdx = idx.getAndIncrement();
 
-                        Thread.currentThread().setName("client-thread-node-" + nodeIdx);
+                        UUID id0 = UUID.randomUUID();
+
+                        id.set(id0);
+
+                        Thread.currentThread().setName("client-thread-node-" + id0.toString());
 
                         try (Ignite ignite = startGrid(nodeIdx)) {
                             log.info("Started node: " + ignite.name());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b971c3f6/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 8147958..eecbb43 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -24,7 +24,9 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
@@ -106,11 +108,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /** */
     private int maxMissedClientHbs = TcpDiscoverySpi.DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
 
+    /** */
+    private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+        TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
 
         disco.setMaxMissedClientHeartbeats(maxMissedClientHbs);
 
@@ -154,6 +159,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         disco.setJoinTimeout(joinTimeout);
         disco.setNetworkTimeout(netTimeout);
 
+        disco.afterWrite(afterWrite);
+
         cfg.setDiscoverySpi(disco);
 
         if (nodeId != null)
@@ -1016,6 +1023,88 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testJoinError3() throws Exception {
+        startServerNodes(1);
+
+        Ignite ignite = G.ignite("server-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+
+        srvSpi.failNodeAddFinishedMessage();
+
+        startClientNodes(1);
+
+        checkNodes(1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinErrorMissedAddFinishedMessage1() throws Exception {
+        missedAddFinishedMessage(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinErrorMissedAddFinishedMessage2() throws Exception {
+        missedAddFinishedMessage(false);
+    }
+
+    /**
+     * @param singleSrv If {@code true} starts one server node two otherwise.
+     * @throws Exception If failed.
+     */
+    public void missedAddFinishedMessage(boolean singleSrv) throws Exception {
+        int srvs = singleSrv ? 1 : 2;
+
+        startServerNodes(srvs);
+
+        afterWrite = new CIX2<TcpDiscoveryAbstractMessage, Socket>() {
+            private boolean first = true;
+
+            @Override public void applyx(TcpDiscoveryAbstractMessage msg, Socket sock) throws IgniteCheckedException {
+                if (first && (msg instanceof TcpDiscoveryJoinRequestMessage)) {
+                    first = false;
+
+                    log.info("Close socket after message write [msg=" + msg + "]");
+
+                    try {
+                        sock.close();
+                    }
+                    catch (IOException e) {
+                        throw new IgniteCheckedException(e);
+                    }
+
+                    log.info("Delay after message write [msg=" + msg + "]");
+
+                    U.sleep(5000); // Wait when server process join request.
+                }
+            }
+        };
+
+        Ignite srv = singleSrv ? G.ignite("server-0") : G.ignite("server-1");
+
+        TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode();
+
+        assertEquals(singleSrv ? 1 : 2, srvNode.order());
+
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+        clientIpFinder.setAddresses(Collections.singleton("localhost:" + srvNode.discoveryPort()));
+
+        startClientNodes(1);
+
+        TcpDiscoveryNode clientNode = (TcpDiscoveryNode)G.ignite("client-0").cluster().localNode();
+
+        assertEquals(srvNode.id(), clientNode.clientRouterNodeId());
+
+        checkNodes(srvs, 1);
+    }
+
+    /**
      * @param clientIdx Client index.
      * @param srvIdx Server index.
      * @throws Exception In case of error.
@@ -1267,8 +1356,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         private AtomicInteger failNodeAdded = new AtomicInteger();
 
         /** */
+        private AtomicInteger failNodeAddFinished = new AtomicInteger();
+
+        /** */
         private AtomicInteger failClientReconnect = new AtomicInteger();
 
+        /** */
+        private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
+
         /**
          * @param lock Lock.
          */
@@ -1287,6 +1382,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         }
 
         /**
+         * @param afterWrite After write callback.
+         */
+        void afterWrite(IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite) {
+            this.afterWrite = afterWrite;
+        }
+
+        /**
          *
          */
         void failNodeAddedMessage() {
@@ -1296,6 +1398,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         /**
          *
          */
+        void failNodeAddFinishedMessage() {
+            failNodeAddFinished.set(1);
+        }
+
+        /**
+         *
+         */
         void failClientReconnectMessage() {
             failClientReconnect.set(1);
         }
@@ -1322,6 +1431,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
                 fail = failNodeAdded.getAndDecrement() > 0;
+            else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+                fail = failNodeAddFinished.getAndDecrement() > 0;
             else if (msg instanceof TcpDiscoveryClientReconnectMessage)
                 fail = failClientReconnect.getAndDecrement() > 0;
 
@@ -1332,6 +1443,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             }
 
             super.writeToSocket(sock, msg, bout);
+
+            if (afterWrite != null)
+                afterWrite.apply(msg, sock);
         }
 
         /** {@inheritDoc} */