You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/25 08:59:06 UTC

incubator-ignite git commit: # handle multiple workers per client

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-yardstick-client-2 [created] 3fa39b19e


# handle multiple workers per client


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3fa39b19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3fa39b19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3fa39b19

Branch: refs/heads/ignite-yardstick-client-2
Commit: 3fa39b19e222aeced4ced738a3f58a597b1b170d
Parents: c97f013
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 25 09:58:54 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 25 09:58:54 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 43 ++++++++++++++++----
 1 file changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fa39b19/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f959379..76ddf75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -4127,15 +4127,44 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
 
                     if (req.client()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Created client message worker [locNodeId=" + locNodeId +
+                        ClientMessageWorker clientMsgWrk0 = new ClientMessageWorker(sock, nodeId);
+
+                        while (true) {
+                            ClientMessageWorker old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
+
+                            if (old == null)
+                                break;
+
+                            log.info("Have old client: " + nodeId);
+
+                            if (old.isInterrupted()) {
+                                clientMsgWorkers.remove(nodeId, old);
+
+                                continue;
+                            }
+
+                            old.join(500);
+
+                            old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
+
+                            if (old == null)
+                                break;
+
+                            log.error("Already have client message worker [locNodeId=" + locNodeId +
                                 ", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
 
-                        clientMsgWrk = new ClientMessageWorker(sock, nodeId);
+                            return;
+                        }
 
-                        clientMsgWrk.start();
+                        //if (log.isDebugEnabled())
+                            log.info("Created client message worker [locNodeId=" + locNodeId +
+                                ", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
+
+                        assert clientMsgWrk0 == clientMsgWorkers.get(nodeId);
+
+                        clientMsgWrk = clientMsgWrk0;
 
-                        clientMsgWorkers.put(nodeId, clientMsgWrk);
+                        clientMsgWrk.start();
                     }
 
                     if (log.isDebugEnabled())
@@ -4423,8 +4452,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
             finally {
                 if (clientMsgWrk != null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId +
+                    //if (log.isDebugEnabled())
+                        log.error("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId +
                             ", rmtNodeId=" + nodeId + ']');
 
                     clientMsgWorkers.remove(nodeId, clientMsgWrk);