You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/25 08:59:06 UTC
incubator-ignite git commit: # handle multiple workers per client
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-yardstick-client-2 [created] 3fa39b19e
# handle multiple workers per client
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3fa39b19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3fa39b19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3fa39b19
Branch: refs/heads/ignite-yardstick-client-2
Commit: 3fa39b19e222aeced4ced738a3f58a597b1b170d
Parents: c97f013
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 25 09:58:54 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 25 09:58:54 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 43 ++++++++++++++++----
1 file changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fa39b19/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f959379..76ddf75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -4127,15 +4127,44 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (req.client()) {
- if (log.isDebugEnabled())
- log.debug("Created client message worker [locNodeId=" + locNodeId +
+ ClientMessageWorker clientMsgWrk0 = new ClientMessageWorker(sock, nodeId);
+
+ while (true) {
+ ClientMessageWorker old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
+
+ if (old == null)
+ break;
+
+ log.info("Have old client: " + nodeId);
+
+ if (old.isInterrupted()) {
+ clientMsgWorkers.remove(nodeId, old);
+
+ continue;
+ }
+
+ old.join(500);
+
+ old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
+
+ if (old == null)
+ break;
+
+ log.error("Already have client message worker [locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
- clientMsgWrk = new ClientMessageWorker(sock, nodeId);
+ return;
+ }
- clientMsgWrk.start();
+ //if (log.isDebugEnabled())
+ log.info("Created client message worker [locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
+
+ assert clientMsgWrk0 == clientMsgWorkers.get(nodeId);
+
+ clientMsgWrk = clientMsgWrk0;
- clientMsgWorkers.put(nodeId, clientMsgWrk);
+ clientMsgWrk.start();
}
if (log.isDebugEnabled())
@@ -4423,8 +4452,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
finally {
if (clientMsgWrk != null) {
- if (log.isDebugEnabled())
- log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId +
+ //if (log.isDebugEnabled())
+ log.error("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId + ']');
clientMsgWorkers.remove(nodeId, clientMsgWrk);