You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/16 09:33:35 UTC
ignite git commit: ignite-comm-opts2
Repository: ignite
Updated Branches:
refs/heads/ignite-comm-opts-2conn [created] c9f62b815
ignite-comm-opts2
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9f62b81
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9f62b81
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9f62b81
Branch: refs/heads/ignite-comm-opts-2conn
Commit: c9f62b815571ab6192736154ef7b90e8e46e95b3
Parents: c81e0d9
Author: sboikov <sb...@gridgain.com>
Authored: Fri Sep 16 12:33:21 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Sep 16 12:33:21 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 40 +--
.../managers/communication/GridIoMessage.java | 4 +
.../ignite/internal/util/nio/GridNioServer.java | 3 +-
.../communication/tcp/TcpCommunicationSpi.java | 244 +++++++++++++------
.../spi/GridTcpSpiForwardingSelfTest.java | 4 +-
5 files changed, 193 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9e547ca..f869c5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -529,26 +529,26 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
}
- Thread t = new Thread(
- new Runnable() {
- @Override public void run() {
- for (;;) {
- try {
- Thread.sleep(5000);
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- dumpStats();
- }
- }
- }
- );
-
- t.setDaemon(true);
-
- t.start();
+// Thread t = new Thread(
+// new Runnable() {
+// @Override public void run() {
+// for (;;) {
+// try {
+// Thread.sleep(5000);
+// }
+// catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+//
+// dumpStats();
+// }
+// }
+// }
+// );
+//
+// t.setDaemon(true);
+//
+// t.start();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index b28ced2..bce4f7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -161,6 +161,10 @@ public class GridIoMessage implements Message {
return skipOnTimeout;
}
+ public int connectionIndex() {
+ return super.hashCode();
+ }
+
/**
* @return {@code True} if message is ordered, {@code false} otherwise.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a2449f8..fbfb5cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1529,7 +1529,8 @@ public class GridNioServer<T> {
sb.append(" Connection info [")
.append("rmtAddr=").append(ses.remoteAddress())
- .append(", locAddr=").append(ses.localAddress());
+ .append(", locAddr=").append(ses.localAddress())
+ .append(", in=").append(ses.accepted());
GridNioRecoveryDescriptor desc = ses.recoveryDescriptor();
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 3292412..9110add 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
@@ -350,7 +351,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
- UUID id = ses.meta(NODE_ID_META);
+ ConnectionId id = ses.meta(NODE_ID_META);
if (id != null) {
GridCommunicationClient client = clients.get(id);
@@ -368,7 +369,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
if (recoveryData != null) {
- if (recoveryData.nodeAlive(getSpiContext().node(id))) {
+ if (recoveryData.nodeAlive(getSpiContext().node(id.id))) {
if (!recoveryData.messagesFutures().isEmpty()) {
reconnect = true;
@@ -381,7 +382,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
recoveryData.onNodeLeft();
}
- DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData,
+ DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id,
+ recoveryData,
reconnect);
commWorker.addProcessDisconnectRequest(disconnectData);
@@ -390,7 +392,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
CommunicationListener<Message> lsnr0 = lsnr;
if (lsnr0 != null)
- lsnr0.onDisconnected(id);
+ lsnr0.onDisconnected(id.id);
}
}
@@ -399,24 +401,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param msg Message.
*/
private void onFirstMessage(GridNioSession ses, Message msg) {
- UUID sndId;
+ ConnectionId sndId;
if (msg instanceof NodeIdMessage)
- sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+ sndId = new ConnectionId(-1, U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0));
else {
assert msg instanceof HandshakeMessage : msg;
- sndId = ((HandshakeMessage)msg).nodeId();
+ sndId = new ConnectionId(((HandshakeMessage)msg).idx, ((HandshakeMessage)msg).nodeId());
}
if (log.isDebugEnabled())
log.debug("Remote node ID received: " + sndId);
- final UUID old = ses.addMeta(NODE_ID_META, sndId);
+ final ConnectionId old = ses.addMeta(NODE_ID_META, sndId);
assert old == null;
- final ClusterNode rmtNode = getSpiContext().node(sndId);
+ final ClusterNode rmtNode = getSpiContext().node(sndId.id);
if (rmtNode == null) {
if (log.isDebugEnabled())
@@ -462,7 +464,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
HandshakeMessage msg0 = (HandshakeMessage)msg;
//
- final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
+ final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode, sndId);
//
// if (oldFut == null) {
// oldClient = clients.get(sndId);
@@ -520,16 +522,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// else {
// The code below causes a race condition between shmem and TCP (see IGNITE-1294)
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
- new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+ new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut, sndId));
if (reserved)
- connected(recoveryDesc, ses, rmtNode, msg0.received(), true, false);
+ connected(recoveryDesc, ses, rmtNode, msg0.received(), true, false, sndId);
// }
// }
}
@Override public void onMessage(GridNioSession ses, Message msg) {
- UUID sndId = ses.meta(NODE_ID_META);
+ ConnectionId sndId = ses.meta(NODE_ID_META);
if (sndId == null) {
assert ses.accepted() : ses;
@@ -601,7 +603,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// else
c = NOOP;
- notifyListener(sndId, msg, c);
+ notifyListener(sndId.id, msg, c);
}
}
@@ -620,7 +622,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ClusterNode node,
long rcvCnt,
boolean sndRes,
- boolean createClient) {
+ boolean createClient,
+ ConnectionId connId) {
+ assert node.id().equals(connId.id);
+
recovery.onHandshake(rcvCnt);
ses.recoveryDescriptor(recovery);
@@ -637,7 +642,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (createClient) {
client = new GridTcpNioCommunicationClient(ses, log);
- GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
+ GridCommunicationClient oldClient = clients.putIfAbsent(connId, client);
assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']';
@@ -672,6 +677,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private final boolean createClient;
+ private final ConnectionId connId;
+
/**
* @param ses Incoming session.
* @param recoveryDesc Recovery descriptor.
@@ -685,13 +692,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ClusterNode rmtNode,
HandshakeMessage msg,
boolean createClient,
- GridFutureAdapter<GridCommunicationClient> fut) {
+ GridFutureAdapter<GridCommunicationClient> fut,
+ ConnectionId connId) {
this.ses = ses;
this.recoveryDesc = recoveryDesc;
this.rmtNode = rmtNode;
this.msg = msg;
this.createClient = createClient;
this.fut = fut;
+ this.connId = connId;
}
/** {@inheritDoc} */
@@ -703,7 +712,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
msgFut.get();
GridTcpNioCommunicationClient client =
- connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
+ connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient, connId);
fut.onDone(client);
}
@@ -717,7 +726,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
fut.onDone();
}
finally {
- clientFuts.remove(rmtNode.id(), fut);
+ clientFuts.remove(connId, fut);
}
}
};
@@ -729,7 +738,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
fut.onDone();
}
finally {
- clientFuts.remove(rmtNode.id(), fut);
+ clientFuts.remove(connId, fut);
}
}
}
@@ -814,7 +823,60 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
/** Clients. */
- private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
+ private final ConcurrentMap<ConnectionId, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
+
+ /** */
+ private int connectionsPerNode = 2;
+
+ public int getConnectionsPerNode() {
+ return connectionsPerNode;
+ }
+
+ public void setConnectionsPerNode(int connectionsPerNode) {
+ this.connectionsPerNode = connectionsPerNode;
+ }
+
+ /**
+ *
+ */
+ public static final class ConnectionId {
+ /** */
+ private final int idx;
+
+ /** */
+ private final UUID id;
+
+ /**
+ * @param idx Connection index.
+ * @param id Node ID.
+ */
+ ConnectionId(int idx, UUID id) {
+ this.idx = idx;
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ ConnectionId clientId = (ConnectionId) o;
+
+ return idx == clientId.idx && id.equals(clientId.id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = idx;
+ res = 31 * res + id.hashCode();
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ConnectionId.class, this);
+ }
+ }
/** SPI listener. */
private volatile CommunicationListener<Message> lsnr;
@@ -861,7 +923,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
};
/** Client connect futures. */
- private final ConcurrentMap<UUID, GridFutureAdapter<GridCommunicationClient>> clientFuts =
+ private final ConcurrentMap<ConnectionId, GridFutureAdapter<GridCommunicationClient>> clientFuts =
GridConcurrentFactory.newMap();
/** */
@@ -1409,7 +1471,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
sb.append("Communication SPI clients: ").append(U.nl());
- for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
+ for (Map.Entry<ConnectionId, GridCommunicationClient> entry : clients.entrySet()) {
sb.append(" [node=").append(entry.getKey())
.append(", client=").append(entry.getValue())
.append(']').append(U.nl());
@@ -1640,9 +1702,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- UUID rmtNodeId = ses.meta(NODE_ID_META);
+ ConnectionId rmtNodeId = ses.meta(NODE_ID_META);
- return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
+ return rmtNodeId != null ? formatter.reader(rmtNodeId.id, msgFactory) : null;
}
};
@@ -1655,9 +1717,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- UUID rmtNodeId = ses.meta(NODE_ID_META);
+ ConnectionId rmtNodeId = ses.meta(NODE_ID_META);
- return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
+ return rmtNodeId != null ? formatter.writer(rmtNodeId.id) : null;
}
};
@@ -1894,16 +1956,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
void onNodeLeft(UUID nodeId) {
assert nodeId != null;
- GridCommunicationClient client = clients.get(nodeId);
+ for (Map.Entry<ConnectionId, GridCommunicationClient> client : clients.entrySet()) {
+ ConnectionId id = client.getKey();
- if (client != null) {
- if (log.isDebugEnabled())
- log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
- ", client=" + client + ']');
+ if (id.id.equals(nodeId)) {
+ if (log.isDebugEnabled())
+ log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
+ ", client=" + client + ']');
- client.forceClose();
+ client.getValue().forceClose();
- clients.remove(nodeId, client);
+ clients.remove(id, client.getValue());
+ }
}
}
@@ -1978,11 +2042,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else {
GridCommunicationClient client = null;
+ ConnectionId id = new ConnectionId(((GridIoMessage)msg).connectionIndex() % connectionsPerNode, node.id());
+
try {
boolean retry;
do {
- client = reserveClient(node);
+ client = reserveClient(node, id);
UUID nodeId = null;
@@ -2013,7 +2079,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
}
finally {
- if (client != null && clients.remove(node.id(), client))
+ if (client != null && clients.remove(id, client))
client.forceClose();
}
}
@@ -2026,13 +2092,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @return The existing or just created client.
* @throws IgniteCheckedException Thrown if any exception occurs.
*/
- private GridCommunicationClient reserveClient(ClusterNode node) throws IgniteCheckedException {
+ private GridCommunicationClient reserveClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException {
assert node != null;
UUID nodeId = node.id();
while (true) {
- GridCommunicationClient client = clients.get(nodeId);
+ GridCommunicationClient client = clients.get(id);
if (client == null) {
if (stopping)
@@ -2041,17 +2107,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// Do not allow concurrent connects.
GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
- GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(nodeId, fut);
+ GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(id, fut);
if (oldFut == null) {
try {
- GridCommunicationClient client0 = clients.get(nodeId);
+ GridCommunicationClient client0 = clients.get(id);
if (client0 == null) {
- client0 = createNioClient(node);
+ client0 = createNioClient(node, id);
if (client0 != null) {
- GridCommunicationClient old = clients.put(nodeId, client0);
+ GridCommunicationClient old = clients.put(id, client0);
assert old == null : "Client already created " +
"[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']';
@@ -2059,7 +2125,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (client0 instanceof GridTcpNioCommunicationClient) {
GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
- if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) {
+ if (tcpClient.session().closeTime() > 0 && clients.remove(id, client0)) {
if (log.isDebugEnabled())
log.debug("Session was closed after client creation, will retry " +
"[node=" + node + ", client=" + client0 + ']');
@@ -2081,7 +2147,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
throw (Error)e;
}
finally {
- clientFuts.remove(nodeId, fut);
+ clientFuts.remove(id, fut);
}
}
else
@@ -2093,7 +2159,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
continue;
if (getSpiContext().node(nodeId) == null) {
- if (clients.remove(nodeId, client))
+ if (clients.remove(id, client))
client.forceClose();
throw new IgniteSpiException("Destination node is not in topology: " + node.id());
@@ -2104,7 +2170,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return client;
else
// Client has just been closed by idle worker. Help it and try again.
- clients.remove(nodeId, client);
+ clients.remove(id, client);
}
}
@@ -2113,7 +2179,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @return Client.
* @throws IgniteCheckedException If failed.
*/
- @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException {
+ @Nullable protected GridCommunicationClient createNioClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException {
assert node != null;
Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
@@ -2154,7 +2220,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
connectGate.enter();
try {
- GridCommunicationClient client = createTcpClient(node);
+ GridCommunicationClient client = createTcpClient(node, id);
if (log.isDebugEnabled())
log.debug("TCP client created: " + client);
@@ -2207,7 +2273,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null);
+ safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null, null);
}
catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
client.forceClose();
@@ -2266,10 +2332,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
- UUID id = ses.meta(NODE_ID_META);
+ ConnectionId id = ses.meta(NODE_ID_META);
if (id != null) {
- ClusterNode node = getSpiContext().node(id);
+ ClusterNode node = getSpiContext().node(id.id);
if (node != null && node.isClient()) {
String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
@@ -2283,7 +2349,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log,
msg);
- getSpiContext().failNode(id, msg);
+ getSpiContext().failNode(id.id, msg);
}
}
}
@@ -2296,7 +2362,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @return Client.
* @throws IgniteCheckedException If failed.
*/
- protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+ protected GridCommunicationClient createTcpClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException {
Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
@@ -2364,7 +2430,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"(node left topology): " + node);
}
- GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node);
+ GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node, id);
if (!recoveryDesc.reserve()) {
U.closeQuiet(ch);
@@ -2386,7 +2452,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
rcvCnt = safeHandshake(ch, recoveryDesc, node.id(),
- timeoutHelper.nextTimeoutChunk(connTimeout0), sslEngine);
+ timeoutHelper.nextTimeoutChunk(connTimeout0), sslEngine, id);
if (rcvCnt == -1)
return null;
@@ -2399,7 +2465,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
try {
Map<Integer, Object> meta = new HashMap<>();
- meta.put(NODE_ID_META, node.id());
+ meta.put(NODE_ID_META, id);
if (isSslEnabled()) {
assert sslEngine != null;
@@ -2568,7 +2634,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Nullable GridNioRecoveryDescriptor recovery,
UUID rmtNodeId,
long timeout,
- @Nullable SSLEngine ssl
+ @Nullable SSLEngine ssl,
+ ConnectionId id
) throws IgniteCheckedException {
HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
@@ -2650,12 +2717,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (recovery != null) {
HandshakeMessage msg = new HandshakeMessage(locNode.id(),
recovery.incrementConnectCount(),
- recovery.received());
+ recovery.received(),
+ id.idx);
if (log.isDebugEnabled())
log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
- buf = ByteBuffer.allocate(33);
+ buf = ByteBuffer.allocate(37);
buf.order(ByteOrder.nativeOrder());
@@ -2810,8 +2878,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param node Node.
* @return Recovery receive data for given node.
*/
- private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) {
- ClientKey id = new ClientKey(node.id(), node.order());
+ private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node, ConnectionId clientId) {
+ ClientKey id = new ClientKey(node.id(), node.order(), clientId.idx);
GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
@@ -2874,18 +2942,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
private static class ClientKey {
/** */
- private UUID nodeId;
+ private final UUID nodeId;
+
+ /** */
+ private final long order;
/** */
- private long order;
+ private final int idx;
/**
* @param nodeId Node ID.
* @param order Node order.
*/
- private ClientKey(UUID nodeId, long order) {
+ private ClientKey(UUID nodeId, long order, int idx) {
this.nodeId = nodeId;
this.order = order;
+ this.idx = idx;
}
/** {@inheritDoc} */
@@ -2898,7 +2970,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ClientKey other = (ClientKey)obj;
- return order == other.order && nodeId.equals(other.nodeId);
+ return idx == other.idx && order == other.order && nodeId.equals(other.nodeId);
}
@@ -2906,7 +2978,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Override public int hashCode() {
int res = nodeId.hashCode();
- res = 31 * res + (int)(order ^ (order >>> 32));
+ res = 31 * res + (int)(order ^ (order >>> 32)) + idx;
return res;
}
@@ -3016,9 +3088,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- UUID rmtNodeId = ses.meta(NODE_ID_META);
+ ConnectionId rmtNodeId = ses.meta(NODE_ID_META);
- return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
+ return rmtNodeId != null ? formatter.writer(rmtNodeId.id) : null;
}
};
@@ -3032,9 +3104,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- UUID rmtNodeId = ses.meta(NODE_ID_META);
+ ConnectionId rmtNodeId = ses.meta(NODE_ID_META);
- return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
+ return rmtNodeId != null ? formatter.reader(rmtNodeId.id, msgFactory) : null;
}
};
@@ -3115,8 +3187,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private void processIdle() {
cleanupRecovery();
- for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
- UUID nodeId = e.getKey();
+ for (Map.Entry<ConnectionId, GridCommunicationClient> e : clients.entrySet()) {
+ UUID nodeId = e.getKey().id;
GridCommunicationClient client = e.getValue();
@@ -3128,7 +3200,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
client.forceClose();
- clients.remove(nodeId, client);
+ clients.remove(e.getKey(), client);
continue;
}
@@ -3136,7 +3208,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridNioRecoveryDescriptor recovery = null;
if (client instanceof GridTcpNioCommunicationClient) {
- recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+ recovery = recoveryDescs.get(new ClientKey(node.id(), node.order(), e.getKey().idx));
if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
@@ -3170,7 +3242,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug("Closing idle node connection: " + nodeId);
if (client.close() || client.closed())
- clients.remove(nodeId, client);
+ clients.remove(e.getKey(), client);
}
}
}
@@ -3223,7 +3295,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
- GridCommunicationClient client = reserveClient(node);
+ GridCommunicationClient client = reserveClient(node, sesInfo.connId);
client.release();
}
@@ -3431,6 +3503,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private long connectCnt;
+ /** */
+ private int idx;
+
/**
* Default constructor required by {@link Message}.
*/
@@ -3443,13 +3518,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param connectCnt Connect count.
* @param rcvCnt Number of received messages.
*/
- public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
+ public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt, int idx) {
assert nodeId != null;
assert rcvCnt >= 0 : rcvCnt;
this.nodeId = nodeId;
this.connectCnt = connectCnt;
this.rcvCnt = rcvCnt;
+ this.idx = idx;
}
/**
@@ -3480,7 +3556,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- if (buf.remaining() < 33)
+ if (buf.remaining() < 37)
return false;
buf.put(HANDSHAKE_MSG_TYPE);
@@ -3495,6 +3571,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
buf.putLong(connectCnt);
+ buf.putInt(idx);
+
return true;
}
@@ -3513,6 +3591,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
connectCnt = buf.getLong();
+ idx = buf.getInt();
+
return true;
}
@@ -3774,17 +3854,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
private static class DisconnectedSessionInfo {
/** */
+ private final ConnectionId connId;
+
+ /** */
private final GridNioRecoveryDescriptor recoveryDesc;
/** */
private final boolean reconnect;
/**
+ * @param connId Node ID.
* @param recoveryDesc Recovery descriptor.
* @param reconnect Reconnect flag.
*/
- DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc,
+ public DisconnectedSessionInfo(ConnectionId connId,
+ @Nullable GridNioRecoveryDescriptor recoveryDesc,
boolean reconnect) {
+ this.connId = connId;
this.recoveryDesc = recoveryDesc;
this.reconnect = reconnect;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 652e47f..4c46be9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -111,14 +111,14 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
cfg.setConnectorConfiguration(null);
TcpCommunicationSpi commSpi = new TcpCommunicationSpi() {
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+ @Override protected GridCommunicationClient createTcpClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException {
Map<String, Object> attrs = new HashMap<>(node.attributes());
attrs.remove(createSpiAttributeName(ATTR_PORT));
((TcpDiscoveryNode)node).setAttributes(attrs);
- return super.createTcpClient(node);
+ return super.createTcpClient(node, id);
}
};