You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/16 11:41:46 UTC
[17/49] ignite git commit: IGNITE-3220 I/O bottleneck on
server/client cluster configuration Communications optimizations: -
possibility to open separate in/out connections - possibility to have
multiple connections between nodes - implemented NIO sessio
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1fe437c..b392c07 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -46,6 +46,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.apache.ignite.Ignite;
@@ -53,6 +54,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -103,6 +105,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -179,6 +182,7 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META
* <li>Node local IP address (see {@link #setLocalAddress(String)})</li>
* <li>Node local port number (see {@link #setLocalPort(int)})</li>
* <li>Local port range (see {@link #setLocalPortRange(int)}</li>
+ * <li>Connections per node (see {@link #setConnectionsPerNode(int)})</li>
* <li>Connection buffer flush frequency (see {@link #setConnectionBufferFlushFrequency(long)})</li>
* <li>Connection buffer size (see {@link #setConnectionBufferSize(int)})</li>
* <li>Idle connection timeout (see {@link #setIdleConnectionTimeout(long)})</li>
@@ -238,6 +242,9 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META
@IgniteSpiConsistencyChecked(optional = false)
public class TcpCommunicationSpi extends IgniteSpiAdapter
implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
+ /** */
+ private static final IgniteProductVersion MULTIPLE_CONN_SINCE_VER = IgniteProductVersion.fromString("1.8.0");
+
/** IPC error message. */
public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
"(switching to TCP, may be slower).";
@@ -257,11 +264,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
+ /** */
+ public static final String ATTR_PAIRED_CONN = "comm.tcp.pairedConnection";
+
/** Default port which node sets listener to (value is <tt>47100</tt>). */
public static final int DFLT_PORT = 47100;
/** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */
- public static final int DFLT_SHMEM_PORT = 48100;
+ public static final int DFLT_SHMEM_PORT = -1;
/** Default idle connection timeout (value is <tt>30000</tt>ms). */
public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
@@ -283,12 +293,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* Default count of selectors for TCP server equals to
- * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
+ * {@code "Math.min(8, Runtime.getRuntime().availableProcessors())"}.
*/
- public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
+ public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
- /** Node ID meta for session. */
- private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
+ /** Connection index meta for session. */
+ private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
/** Message tracker meta for session. */
private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
@@ -303,11 +313,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
public static final boolean DFLT_TCP_NODELAY = true;
/** Default received messages threshold for sending ack. */
- public static final int DFLT_ACK_SND_THRESHOLD = 16;
+ public static final int DFLT_ACK_SND_THRESHOLD = 32;
/** Default socket write timeout. */
public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
+ /** Default connections per node. */
+ public static final int DFLT_CONN_PER_NODE = 1;
+
/** No-op runnable. */
private static final IgniteRunnable NOOP = new IgniteRunnable() {
@Override public void run() {
@@ -327,11 +340,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private ConnectGateway connectGate;
+ /** */
+ private ConnectionPolicy connPlc;
+
/** Server listener. */
private final GridNioServerListener<Message> srvLsnr =
new GridNioServerListenerAdapter<Message>() {
@Override public void onSessionWriteTimeout(GridNioSession ses) {
- LT.warn(log, "Communication SPI Session write timed out (consider increasing " +
+ LT.warn(log,"Communication SPI session write timed out (consider increasing " +
"'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
", writeTimeout=" + sockWriteTimeout + ']');
@@ -347,46 +363,53 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Sending local node ID to newly accepted session: " + ses);
- ses.send(nodeIdMessage());
+ try {
+ ses.sendNoFuture(nodeIdMessage());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + e, e);
+ }
}
}
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
- UUID id = ses.meta(NODE_ID_META);
+ ConnectionKey connId = ses.meta(CONN_IDX_META);
- if (id != null) {
- GridCommunicationClient client = clients.get(id);
+ if (connId != null) {
+ UUID id = connId.nodeId();
- if (client instanceof GridTcpNioCommunicationClient &&
- ((GridTcpNioCommunicationClient) client).session() == ses) {
- client.close();
+ GridCommunicationClient[] nodeClients = clients.get(id);
- clients.remove(id, client);
+ if (nodeClients != null) {
+ for (GridCommunicationClient client : nodeClients) {
+ if (client instanceof GridTcpNioCommunicationClient &&
+ ((GridTcpNioCommunicationClient)client).session() == ses) {
+ client.close();
+
+ removeNodeClient(id, client);
+ }
+ }
}
if (!stopping) {
- boolean reconnect = false;
-
- GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
-
- if (recoveryData != null) {
- if (recoveryData.nodeAlive(getSpiContext().node(id))) {
- if (!recoveryData.messagesFutures().isEmpty()) {
- reconnect = true;
+ GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+ if (outDesc != null) {
+ if (outDesc.nodeAlive(getSpiContext().node(id))) {
+ if (!outDesc.messagesRequests().isEmpty()) {
if (log.isDebugEnabled())
log.debug("Session was closed but there are unacknowledged messages, " +
- "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
+ "will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
+
+ DisconnectedSessionInfo disconnectData =
+ new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
+
+ commWorker.addProcessDisconnectRequest(disconnectData);
}
}
else
- recoveryData.onNodeLeft();
+ outDesc.onNodeLeft();
}
-
- DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData,
- reconnect);
-
- commWorker.addProcessDisconnectRequest(disconnectData);
}
CommunicationListener<Message> lsnr0 = lsnr;
@@ -403,21 +426,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private void onFirstMessage(GridNioSession ses, Message msg) {
UUID sndId;
- if (msg instanceof NodeIdMessage)
- sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+ ConnectionKey connKey;
+
+ if (msg instanceof NodeIdMessage) {
+ sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
+ connKey = new ConnectionKey(sndId, 0, -1);
+ }
else {
assert msg instanceof HandshakeMessage : msg;
+ HandshakeMessage msg0 = (HandshakeMessage)msg;
+
sndId = ((HandshakeMessage)msg).nodeId();
+ connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
}
if (log.isDebugEnabled())
log.debug("Remote node ID received: " + sndId);
- final UUID old = ses.addMeta(NODE_ID_META, sndId);
-
- assert old == null;
-
final ClusterNode rmtNode = getSpiContext().node(sndId);
if (rmtNode == null) {
@@ -429,57 +455,65 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return;
}
+ final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
+
+ assert old == null;
+
ClusterNode locNode = getSpiContext().localNode();
if (ses.remoteAddress() == null)
return;
- GridCommunicationClient oldClient = clients.get(sndId);
+ assert msg instanceof HandshakeMessage : msg;
- boolean hasShmemClient = false;
+ HandshakeMessage msg0 = (HandshakeMessage)msg;
- if (oldClient != null) {
- if (oldClient instanceof GridTcpNioCommunicationClient) {
- if (log.isDebugEnabled())
- log.debug("Received incoming connection when already connected " +
- "to this node, rejecting [locNode=" + locNode.id() +
- ", rmtNode=" + sndId + ']');
+ if (usePairedConnections(rmtNode)) {
+ final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
- ses.send(new RecoveryLastReceivedMessage(-1));
+ ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode);
- return;
- }
+ boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(), c);
+
+ if (reserve)
+ connectedNew(recoveryDesc, ses, true);
else {
- assert oldClient instanceof GridShmemCommunicationClient;
+ if (c.failed) {
+ ses.send(new RecoveryLastReceivedMessage(-1));
+
+ for (GridNioSession ses0 : nioSrvr.sessions()) {
+ ConnectionKey key0 = ses0.meta(CONN_IDX_META);
- hasShmemClient = true;
+ if (ses0.accepted() && key0 != null &&
+ key0.nodeId().equals(connKey.nodeId()) &&
+ key0.connectionIndex() == connKey.connectionIndex() &&
+ key0.connectCount() < connKey.connectCount())
+ ses0.close();
+ }
+ }
}
}
+ else {
+ assert connKey.connectionIndex() >= 0 : connKey;
- GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
-
- GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
-
- assert msg instanceof HandshakeMessage : msg;
-
- HandshakeMessage msg0 = (HandshakeMessage)msg;
+ GridCommunicationClient[] curClients = clients.get(sndId);
- final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
+ GridCommunicationClient oldClient =
+ curClients != null && connKey.connectionIndex() < curClients.length ?
+ curClients[connKey.connectionIndex()] :
+ null;
- if (oldFut == null) {
- oldClient = clients.get(sndId);
+ boolean hasShmemClient = false;
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
if (log.isDebugEnabled())
log.debug("Received incoming connection when already connected " +
- "to this node, rejecting [locNode=" + locNode.id() +
- ", rmtNode=" + sndId + ']');
+ "to this node, rejecting [locNode=" + locNode.id() +
+ ", rmtNode=" + sndId + ']');
ses.send(new RecoveryLastReceivedMessage(-1));
- fut.onDone(oldClient);
-
return;
}
else {
@@ -489,51 +523,86 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
- boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
- new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+ GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
- if (log.isDebugEnabled())
- log.debug("Received incoming connection from remote node " +
+ GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
+
+ final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
+
+ if (oldFut == null) {
+ curClients = clients.get(sndId);
+
+ oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
+ curClients[connKey.connectionIndex()] : null;
+
+ if (oldClient != null) {
+ if (oldClient instanceof GridTcpNioCommunicationClient) {
+ assert oldClient.connectionIndex() == connKey.connectionIndex() : oldClient;
+
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection when already connected " +
+ "to this node, rejecting [locNode=" + locNode.id() +
+ ", rmtNode=" + sndId + ']');
+
+ ses.send(new RecoveryLastReceivedMessage(-1));
+
+ fut.onDone(oldClient);
+
+ return;
+ }
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
+
+ hasShmemClient = true;
+ }
+ }
+
+ boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+ new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
+
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection from remote node " +
"[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
- if (reserved) {
- try {
- GridTcpNioCommunicationClient client =
+ if (reserved) {
+ try {
+ GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
- fut.onDone(client);
- }
- finally {
- clientFuts.remove(rmtNode.id(), fut);
+ fut.onDone(client);
+ }
+ finally {
+ clientFuts.remove(connKey, fut);
+ }
}
}
- }
- else {
- if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
- if (log.isDebugEnabled()) {
- log.debug("Received incoming connection from remote node while " +
+ else {
+ if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received incoming connection from remote node while " +
"connecting to this node, rejecting [locNode=" + locNode.id() +
", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
", rmtNodeOrder=" + rmtNode.order() + ']');
- }
+ }
- ses.send(new RecoveryLastReceivedMessage(-1));
- }
- else {
- // The code below causes a race condition between shmem and TCP (see IGNITE-1294)
- boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
- new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+ ses.send(new RecoveryLastReceivedMessage(-1));
+ }
+ else {
+ // The code below causes a race condition between shmem and TCP (see IGNITE-1294)
+ boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+ new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
- if (reserved)
- connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
+ if (reserved)
+ connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
+ }
}
}
}
@Override public void onMessage(GridNioSession ses, Message msg) {
- UUID sndId = ses.meta(NODE_ID_META);
+ ConnectionKey connKey = ses.meta(CONN_IDX_META);
- if (sndId == null) {
+ if (connKey == null) {
assert ses.accepted() : ses;
if (!connectGate.tryEnter()) {
@@ -555,29 +624,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else {
rcvdMsgsCnt.increment();
- GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+ if (msg instanceof RecoveryLastReceivedMessage) {
+ GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
- if (recovery != null) {
- if (msg instanceof RecoveryLastReceivedMessage) {
+ if (recovery != null) {
RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
- if (log.isDebugEnabled())
- log.debug("Received recovery acknowledgement [rmtNode=" + sndId +
+ if (log.isDebugEnabled()) {
+ log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() +
+ ", connIdx=" + connKey.connectionIndex() +
", rcvCnt=" + msg0.received() + ']');
+ }
recovery.ackReceived(msg0.received());
return;
}
- else {
+ }
+ else {
+ GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
+
+ if (recovery != null) {
long rcvCnt = recovery.onReceived();
if (rcvCnt % ackSndThreshold == 0) {
- if (log.isDebugEnabled())
- log.debug("Send recovery acknowledgement [rmtNode=" + sndId +
+ if (log.isDebugEnabled()) {
+ log.debug("Send recovery acknowledgement [rmtNode=" + connKey.nodeId() +
+ ", connIdx=" + connKey.connectionIndex() +
", rcvCnt=" + rcvCnt + ']');
+ }
- nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt));
+ ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
recovery.lastAcknowledged(rcvCnt);
}
@@ -603,7 +680,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else
c = NOOP;
- notifyListener(sndId, msg, c);
+ notifyListener(connKey.nodeId(), msg, c);
}
}
@@ -611,7 +688,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param recovery Recovery descriptor.
* @param ses Session.
* @param node Node.
- * @param rcvCnt Number of received messages..
+ * @param rcvCnt Number of received messages.
* @param sndRes If {@code true} sends response for recovery handshake.
* @param createClient If {@code true} creates NIO communication client.
* @return Client.
@@ -623,32 +700,128 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long rcvCnt,
boolean sndRes,
boolean createClient) {
+ ConnectionKey connKey = ses.meta(CONN_IDX_META);
+
+ assert connKey != null && connKey.connectionIndex() >= 0 : connKey;
+ assert !usePairedConnections(node);
+
recovery.onHandshake(rcvCnt);
- ses.recoveryDescriptor(recovery);
+ ses.inRecoveryDescriptor(recovery);
+ ses.outRecoveryDescriptor(recovery);
nioSrvr.resend(ses);
- if (sndRes)
- nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+ try {
+ if (sndRes)
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + e, e);
+ }
- recovery.connected();
+ recovery.onConnected();
GridTcpNioCommunicationClient client = null;
if (createClient) {
- client = new GridTcpNioCommunicationClient(ses, log);
-
- GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
+ client = new GridTcpNioCommunicationClient(connKey.connectionIndex(), ses, log);
- assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
- ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']';
+ addNodeClient(node, connKey.connectionIndex(), client);
}
return client;
}
/**
+ * @param recovery Recovery descriptor.
+ * @param ses Session.
+ * @param sndRes If {@code true} sends response for recovery handshake.
+ */
+ private void connectedNew(
+ GridNioRecoveryDescriptor recovery,
+ GridNioSession ses,
+ boolean sndRes) {
+ try {
+ ses.inRecoveryDescriptor(recovery);
+
+ if (sndRes)
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+
+ recovery.onConnected();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + e, e);
+ }
+ }
+
+ /**
+ *
+ */
+ class ConnectClosureNew implements IgniteInClosure<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final GridNioSession ses;
+
+ /** */
+ private final GridNioRecoveryDescriptor recoveryDesc;
+
+ /** */
+ private final ClusterNode rmtNode;
+
+ /** */
+ private boolean failed;
+
+ /**
+ * @param ses Incoming session.
+ * @param recoveryDesc Recovery descriptor.
+ * @param rmtNode Remote node.
+ */
+ ConnectClosureNew(GridNioSession ses,
+ GridNioRecoveryDescriptor recoveryDesc,
+ ClusterNode rmtNode) {
+ this.ses = ses;
+ this.recoveryDesc = recoveryDesc;
+ this.rmtNode = rmtNode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(Boolean success) {
+ try {
+ failed = !success;
+
+ if (success) {
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> msgFut) {
+ try {
+ msgFut.get();
+
+ connectedNew(recoveryDesc, ses, false);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send recovery handshake " +
+ "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+
+ recoveryDesc.release();
+ }
+ }
+ };
+
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
+ }
+ else
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + e, e);
+ }
+ }
+ }
+
+ /**
*
*/
@SuppressWarnings("PackageVisibleInnerClass")
@@ -674,10 +847,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private final boolean createClient;
+ /** */
+ private final ConnectionKey connKey;
+
/**
* @param ses Incoming session.
* @param recoveryDesc Recovery descriptor.
* @param rmtNode Remote node.
+ * @param connKey Connection key.
* @param msg Handshake message.
* @param createClient If {@code true} creates NIO communication client..
* @param fut Connect future.
@@ -685,12 +862,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ConnectClosure(GridNioSession ses,
GridNioRecoveryDescriptor recoveryDesc,
ClusterNode rmtNode,
+ ConnectionKey connKey,
HandshakeMessage msg,
boolean createClient,
GridFutureAdapter<GridCommunicationClient> fut) {
this.ses = ses;
this.recoveryDesc = recoveryDesc;
this.rmtNode = rmtNode;
+ this.connKey = connKey;
this.msg = msg;
this.createClient = createClient;
this.fut = fut;
@@ -699,39 +878,44 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public void apply(Boolean success) {
if (success) {
- IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> msgFut) {
- try {
- msgFut.get();
+ try {
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> msgFut) {
+ try {
+ msgFut.get();
- GridTcpNioCommunicationClient client =
- connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
+ GridTcpNioCommunicationClient client =
+ connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
- fut.onDone(client);
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send recovery handshake " +
- "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+ fut.onDone(client);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send recovery handshake " +
+ "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
- recoveryDesc.release();
+ recoveryDesc.release();
- fut.onDone();
- }
- finally {
- clientFuts.remove(rmtNode.id(), fut);
+ fut.onDone();
+ }
+ finally {
+ clientFuts.remove(connKey, fut);
+ }
}
- }
- };
+ };
- nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + e, e);
+ }
}
else {
try {
fut.onDone();
}
finally {
- clientFuts.remove(rmtNode.id(), fut);
+ clientFuts.remove(connKey, fut);
}
}
}
@@ -794,6 +978,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Shared memory server. */
private IpcSharedMemoryServerEndpoint shmemSrv;
+ /** */
+ private boolean usePairedConnections = true;
+
+ /** */
+ private int connectionsPerNode = DFLT_CONN_PER_NODE;
+
/** {@code TCP_NODELAY} option value for created sockets. */
private boolean tcpNoDelay = DFLT_TCP_NODELAY;
@@ -816,7 +1006,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
/** Clients. */
- private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
+ private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridConcurrentFactory.newMap();
/** SPI listener. */
private volatile CommunicationListener<Message> lsnr;
@@ -830,6 +1020,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Count of selectors to use in TCP server. */
private int selectorsCnt = DFLT_SELECTORS_CNT;
+ /**
+ * Defines how many non-blocking {@code selector.selectNow()} should be made before
+ * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+ * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+ */
+ private long selectorSpins = IgniteSystemProperties.getLong("IGNITE_SELECTOR_SPINS", 0L);
+
/** Address resolver. */
private AddressResolver addrRslvr;
@@ -863,11 +1060,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
};
/** Client connect futures. */
- private final ConcurrentMap<UUID, GridFutureAdapter<GridCommunicationClient>> clientFuts =
+ private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>> clientFuts =
GridConcurrentFactory.newMap();
/** */
- private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
+ private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
+
+ /** */
+ private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap();
+
+ /** */
+ private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -976,6 +1179,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return locPortRange;
}
+ /** {@inheritDoc} */
+ @Override public boolean isUsePairedConnections() {
+ return usePairedConnections;
+ }
+
+ /**
+ * Set this to {@code true} if {@code TcpCommunicationSpi} should
+ * maintain connection for outgoing and incoming messages separately.
+ * In this case total number of connections between local and each remote node
+ * is {@link #getConnectionsPerNode()} * 2.
+ * <p>
+ * Set this to {@code false} if each connection of {@link #getConnectionsPerNode()}
+ * should be used for outgoing and incoming messages. In this case total number
+ * of connections between local and each remote node is {@link #getConnectionsPerNode()}.
+ * In this case load NIO selectors load
+ * balancing of {@link GridNioServer} will be disabled.
+ * <p>
+ * Default is {@code true}.
+ *
+ * @param usePairedConnections {@code true} to use paired connections and {@code false} otherwise.
+ * @see #getConnectionsPerNode()
+ */
+ public void setUsePairedConnections(boolean usePairedConnections) {
+ this.usePairedConnections = usePairedConnections;
+ }
+
+ /**
+ * Sets number of connections to each remote node. if {@link #isUsePairedConnections()}
+ * is {@code true} then number of connections is doubled and half is used for incoming and
+ * half for outgoing messages.
+ *
+ * @param maxConnectionsPerNode Number of connections per node.
+ * @see #isUsePairedConnections()
+ */
+ public void setConnectionsPerNode(int maxConnectionsPerNode) {
+ this.connectionsPerNode = maxConnectionsPerNode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getConnectionsPerNode() {
+ return connectionsPerNode;
+ }
+
/**
* Sets local port to accept shared memory connections.
* <p>
@@ -1222,6 +1468,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return selectorsCnt;
}
+ /** {@inheritDoc} */
+ @Override public long getSelectorSpins() {
+ return selectorSpins;
+ }
+
+ /**
+ * Defines how many non-blocking {@code selector.selectNow()} should be made before
+ * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+ * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+ *
+ * @param selectorSpins Selector thread busy-loop iterations.
+ */
+ public void setSelectorSpins(long selectorSpins) {
+ this.selectorSpins = selectorSpins;
+ }
+
/**
* Sets value for {@code TCP_NODELAY} socket option. Each
* socket will be opened using provided value.
@@ -1396,7 +1658,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log != null) {
StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
- for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
+ for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
GridNioRecoveryDescriptor desc = entry.getValue();
sb.append(" [key=").append(entry.getKey())
@@ -1409,14 +1671,48 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.append(']').append(U.nl());
}
- sb.append("Communication SPI clients: ").append(U.nl());
+ for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
+ GridNioRecoveryDescriptor desc = entry.getValue();
+
+ sb.append(" [key=").append(entry.getKey())
+ .append(", msgsSent=").append(desc.sent())
+ .append(", msgsAckedByRmt=").append(desc.acked())
+ .append(", reserveCnt=").append(desc.reserveCount())
+ .append(", connected=").append(desc.connected())
+ .append(", reserved=").append(desc.reserved())
+ .append(", descIdHash=").append(System.identityHashCode(desc))
+ .append(']').append(U.nl());
+ }
+
+ for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
+ GridNioRecoveryDescriptor desc = entry.getValue();
- for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
- sb.append(" [node=").append(entry.getKey())
- .append(", client=").append(entry.getValue())
+ sb.append(" [key=").append(entry.getKey())
+ .append(", msgsRcvd=").append(desc.received())
+ .append(", lastAcked=").append(desc.lastAcknowledged())
+ .append(", reserveCnt=").append(desc.reserveCount())
+ .append(", connected=").append(desc.connected())
+ .append(", reserved=").append(desc.reserved())
+ .append(", handshakeIdx=").append(desc.handshakeIndex())
+ .append(", descIdHash=").append(System.identityHashCode(desc))
.append(']').append(U.nl());
}
+ sb.append("Communication SPI clients: ").append(U.nl());
+
+ for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
+ UUID nodeId = entry.getKey();
+ GridCommunicationClient[] clients0 = entry.getValue();
+
+ for (GridCommunicationClient client : clients0) {
+ if (client != null) {
+ sb.append(" [node=").append(nodeId)
+ .append(", client=").append(client)
+ .append(']').append(U.nl());
+ }
+ }
+ }
+
U.warn(log, sb.toString());
}
@@ -1426,6 +1722,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
nioSrvr.dumpStats();
}
+ /** */
+ private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
+
+ /** */
+ private final AtomicInteger connIdx = new AtomicInteger();
+
/** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
initFailureDetectionTimeout();
@@ -1439,6 +1741,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
+ assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0");
+ assertParameter(connectionsPerNode <= 1024, "connectionsPerNode <= 1024");
if (!failureDetectionTimeoutEnabled()) {
assertParameter(reconCnt > 0, "reconnectCnt > 0");
@@ -1458,6 +1762,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
}
+ if (connectionsPerNode > 1) {
+ connPlc = new ConnectionPolicy() {
+ @Override public int connectionIndex() {
+ return (int)(U.safeAbs(Thread.currentThread().getId()) % connectionsPerNode);
+ }
+ };
+ }
+ else {
+ connPlc = new ConnectionPolicy() {
+ @Override public int connectionIndex() {
+ return 0;
+ }
+ };
+ }
+
try {
locHost = U.resolveLocalHost(locAddr);
}
@@ -1495,6 +1814,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
res.put(createSpiAttributeName(ATTR_PORT), boundTcpPort);
res.put(createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null);
res.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
+ res.put(createSpiAttributeName(ATTR_PAIRED_CONN), usePairedConnections);
return res;
}
@@ -1524,6 +1844,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("sockRcvBuf", sockRcvBuf));
log.debug(configInfo("shmemPort", shmemPort));
log.debug(configInfo("msgQueueLimit", msgQueueLimit));
+ log.debug(configInfo("connectionsPerNode", connectionsPerNode));
if (failureDetectionTimeoutEnabled()) {
log.debug(configInfo("connTimeout", connTimeout));
@@ -1548,6 +1869,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
", slowClientQueueLimit=" + slowClientQueueLimit + ']');
}
+ if (msgQueueLimit == 0)
+ U.quietAndWarn(log, "Message queue limit is set to 0 which may lead to " +
+ "potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes " +
+ "due to message queues growth on sender and reciever sides.");
+
registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
connectGate = new ConnectGateway();
@@ -1642,9 +1968,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- UUID rmtNodeId = ses.meta(NODE_ID_META);
+ ConnectionKey key = ses.meta(CONN_IDX_META);
- return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
+ return key != null ? formatter.reader(key.nodeId(), msgFactory) : null;
}
};
@@ -1657,9 +1983,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- UUID rmtNodeId = ses.meta(NODE_ID_META);
+ ConnectionKey key = ses.meta(CONN_IDX_META);
- return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
+ return key != null ? formatter.writer(key.nodeId()) : null;
}
};
@@ -1716,6 +2042,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.logger(log)
.selectorCount(selectorsCnt)
.gridName(gridName)
+ .serverName("tcp-comm")
.tcpNoDelay(tcpNoDelay)
.directBuffer(directBuf)
.byteOrder(ByteOrder.nativeOrder())
@@ -1725,18 +2052,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.directMode(true)
.metricsListener(metricsLsnr)
.writeTimeout(sockWriteTimeout)
+ .selectorSpins(selectorSpins)
.filters(filters)
.writerFactory(writerFactory)
.skipRecoveryPredicate(skipRecoveryPred)
.messageQueueSizeListener(queueSizeMonitor)
+ .balancing(usePairedConnections) // Current balancing logic assumes separate in/out connections.
.build();
boundTcpPort = port;
// Ack Port the TCP server was bound to.
if (log.isInfoEnabled())
- log.info("Successfully bound to TCP port [port=" + boundTcpPort +
- ", locHost=" + locHost + ']');
+ log.info("Successfully bound communication NIO server to TCP port " +
+ "[port=" + boundTcpPort + ", locHost=" + locHost + ", selectorsCnt=" + selectorsCnt +
+ ", selectorSpins=" + srvr.selectorSpins() + ']');
srvr.idleTimeout(idleConnTimeout);
@@ -1837,8 +2167,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
shmemWorkers.clear();
// Force closing on stop (safety).
- for (GridCommunicationClient client : clients.values())
- client.forceClose();
+ for (GridCommunicationClient[] clients0 : clients.values()) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null)
+ client.forceClose();
+ }
+ }
// Clear resources.
nioSrvr = null;
@@ -1863,8 +2197,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
connectGate.stopped();
// Force closing.
- for (GridCommunicationClient client : clients.values())
- client.forceClose();
+ for (GridCommunicationClient[] clients0 : clients.values()) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null)
+ client.forceClose();
+ }
+ }
getSpiContext().deregisterPorts();
@@ -1875,8 +2213,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
connectGate.disconnected(reconnectFut);
- for (GridCommunicationClient client : clients.values())
- client.forceClose();
+ for (GridCommunicationClient[] clients0 : clients.values()) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null)
+ client.forceClose();
+ }
+ }
IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
"Failed to connect client node disconnected.");
@@ -1885,6 +2227,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
clientFut.onDone(err);
recoveryDescs.clear();
+ inRecDescs.clear();
+ outRecDescs.clear();
}
/** {@inheritDoc} */
@@ -1898,16 +2242,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
void onNodeLeft(UUID nodeId) {
assert nodeId != null;
- GridCommunicationClient client = clients.get(nodeId);
-
- if (client != null) {
- if (log.isDebugEnabled())
- log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
- ", client=" + client + ']');
+ GridCommunicationClient[] clients0 = clients.remove(nodeId);
- client.forceClose();
+ if (clients0 != null) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null) {
+ if (log.isDebugEnabled())
+ log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
+ ", client=" + client + ']');
- clients.remove(nodeId, client);
+ client.forceClose();
+ }
+ }
}
}
@@ -1982,11 +2328,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else {
GridCommunicationClient client = null;
+ int connIdx = useMultipleConnections(node) ? connPlc.connectionIndex() : 0;
+
try {
boolean retry;
do {
- client = reserveClient(node);
+ client = reserveClient(node, connIdx);
UUID nodeId = null;
@@ -2000,7 +2348,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (!retry)
sentMsgsCnt.increment();
else {
- clients.remove(node.id(), client);
+ removeNodeClient(node.id(), client);
ClusterNode node0 = getSpiContext().node(node.id());
@@ -2017,26 +2365,94 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
}
finally {
- if (client != null && clients.remove(node.id(), client))
+ if (client != null && removeNodeClient(node.id(), client))
client.forceClose();
}
}
}
/**
+ * @param nodeId Node ID.
+ * @param rmvClient Client to remove.
+ * @return {@code True} if client was removed.
+ */
+ private boolean removeNodeClient(UUID nodeId, GridCommunicationClient rmvClient) {
+ for (;;) {
+ GridCommunicationClient[] curClients = clients.get(nodeId);
+
+ if (curClients == null || rmvClient.connectionIndex() >= curClients.length || curClients[rmvClient.connectionIndex()] != rmvClient)
+ return false;
+
+ GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length);
+
+ newClients[rmvClient.connectionIndex()] = null;
+
+ if (clients.replace(nodeId, curClients, newClients))
+ return true;
+ }
+ }
+
+ /**
+ * @param node Node.
+ * @param connIdx Connection index.
+ * @param addClient Client to add.
+ */
+ private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
+ assert connectionsPerNode > 0 : connectionsPerNode;
+ assert connIdx == addClient.connectionIndex() : addClient;
+
+ if (connIdx >= connectionsPerNode) {
+ assert !usePairedConnections(node);
+
+ return;
+ }
+
+ for (;;) {
+ GridCommunicationClient[] curClients = clients.get(node.id());
+
+ assert curClients == null || curClients[connIdx] == null : "Client already created [node=" + node.id() +
+ ", connIdx=" + connIdx +
+ ", client=" + addClient +
+ ", oldClient=" + curClients[connIdx] + ']';
+
+ GridCommunicationClient[] newClients;
+
+ if (curClients == null) {
+ newClients = new GridCommunicationClient[useMultipleConnections(node) ? connectionsPerNode : 1];
+ newClients[connIdx] = addClient;
+
+ if (clients.putIfAbsent(node.id(), newClients) == null)
+ break;
+ }
+ else {
+ newClients = Arrays.copyOf(curClients, curClients.length);
+ newClients[connIdx] = addClient;
+
+ if (clients.replace(node.id(), curClients, newClients))
+ break;
+ }
+ }
+ }
+
+ /**
* Returns existing or just created client to node.
*
* @param node Node to which client should be open.
+ * @param connIdx Connection index.
* @return The existing or just created client.
* @throws IgniteCheckedException Thrown if any exception occurs.
*/
- private GridCommunicationClient reserveClient(ClusterNode node) throws IgniteCheckedException {
+ private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
assert node != null;
+ assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
UUID nodeId = node.id();
while (true) {
- GridCommunicationClient client = clients.get(nodeId);
+ GridCommunicationClient[] curClients = clients.get(nodeId);
+
+ GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
+ curClients[connIdx] : null;
if (client == null) {
if (stopping)
@@ -2045,25 +2461,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// Do not allow concurrent connects.
GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
- GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(nodeId, fut);
+ ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
+
+ GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
if (oldFut == null) {
try {
- GridCommunicationClient client0 = clients.get(nodeId);
+ GridCommunicationClient[] curClients0 = clients.get(nodeId);
+
+ GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
+ curClients0[connIdx] : null;
if (client0 == null) {
- client0 = createNioClient(node);
+ client0 = createNioClient(node, connIdx);
if (client0 != null) {
- GridCommunicationClient old = clients.put(nodeId, client0);
-
- assert old == null : "Client already created " +
- "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']';
+ addNodeClient(node, connIdx, client0);
if (client0 instanceof GridTcpNioCommunicationClient) {
GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
- if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) {
+ if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
if (log.isDebugEnabled())
log.debug("Session was closed after client creation, will retry " +
"[node=" + node + ", client=" + client0 + ']');
@@ -2085,7 +2503,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
throw (Error)e;
}
finally {
- clientFuts.remove(nodeId, fut);
+ clientFuts.remove(connKey, fut);
}
}
else
@@ -2097,27 +2515,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
continue;
if (getSpiContext().node(nodeId) == null) {
- if (clients.remove(nodeId, client))
+ if (removeNodeClient(nodeId, client))
client.forceClose();
throw new IgniteSpiException("Destination node is not in topology: " + node.id());
}
}
+ assert connIdx == client.connectionIndex() : client;
+
if (client.reserve())
return client;
else
// Client has just been closed by idle worker. Help it and try again.
- clients.remove(nodeId, client);
+ removeNodeClient(nodeId, client);
}
}
/**
* @param node Node to create client for.
+ * @param connIdx Connection index.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
- @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException {
+ @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx)
+ throws IgniteCheckedException {
assert node != null;
Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
@@ -2136,6 +2558,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
try {
GridCommunicationClient client = createShmemClient(
node,
+ connIdx,
shmemPort);
if (log.isDebugEnabled())
@@ -2158,7 +2581,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
connectGate.enter();
try {
- GridCommunicationClient client = createTcpClient(node);
+ GridCommunicationClient client = createTcpClient(node, connIdx);
if (log.isDebugEnabled())
log.debug("TCP client created: " + client);
@@ -2173,10 +2596,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* @param node Node.
* @param port Port.
+ * @param connIdx Connection index.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
- @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node,
+ @Nullable private GridCommunicationClient createShmemClient(ClusterNode node,
+ int connIdx,
Integer port) throws IgniteCheckedException {
int attempt = 1;
@@ -2190,7 +2615,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridCommunicationClient client;
try {
- client = new GridShmemCommunicationClient(metricsLsnr,
+ client = new GridShmemCommunicationClient(
+ connIdx,
+ metricsLsnr,
port,
timeoutHelper.nextTimeoutChunk(connTimeout),
log,
@@ -2211,7 +2638,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null);
+ safeHandshake(client,
+ null,
+ node.id(),
+ timeoutHelper.nextTimeoutChunk(connTimeout0),
+ null,
+ null);
}
catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
client.forceClose();
@@ -2270,10 +2702,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
- UUID id = ses.meta(NODE_ID_META);
+ ConnectionKey id = ses.meta(CONN_IDX_META);
if (id != null) {
- ClusterNode node = getSpiContext().node(id);
+ ClusterNode node = getSpiContext().node(id.nodeId);
if (node != null && node.isClient()) {
String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
@@ -2283,11 +2715,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
", clientNode=" + node +
", slowClientQueueLimit=" + slowClientQueueLimit + ']';
- U.quietAndWarn(
- log,
- msg);
+ U.quietAndWarn(log, msg);
- getSpiContext().failNode(id, msg);
+ getSpiContext().failNode(id.nodeId(), msg);
}
}
}
@@ -2297,10 +2727,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Establish TCP connection to remote node and returns client.
*
* @param node Remote node.
+ * @param connIdx Connection index.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
- protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+ protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
@@ -2368,7 +2799,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"(node left topology): " + node);
}
- GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node);
+ ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
+
+ GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
if (!recoveryDesc.reserve()) {
U.closeQuiet(ch);
@@ -2395,11 +2828,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
sslMeta.sslEngine(sslEngine);
}
+ Integer handshakeConnIdx = useMultipleConnections(node) ? connIdx : null;
+
rcvCnt = safeHandshake(ch,
recoveryDesc,
node.id(),
timeoutHelper.nextTimeoutChunk(connTimeout0),
- sslMeta);
+ sslMeta,
+ handshakeConnIdx);
if (rcvCnt == -1)
return null;
@@ -2410,7 +2846,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- meta.put(NODE_ID_META, node.id());
+ meta.put(CONN_IDX_META, connKey);
if (recoveryDesc != null) {
recoveryDesc.onHandshake(rcvCnt);
@@ -2420,7 +2856,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridNioSession ses = nioSrvr.createSession(ch, meta).get();
- client = new GridTcpNioCommunicationClient(ses, log);
+ client = new GridTcpNioCommunicationClient(connIdx, ses, log);
conn = true;
}
@@ -2564,6 +3000,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
* @param sslMeta Session meta.
+ * @param handshakeConnIdx Non null connection index if need send it in handshake.
* @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
* @return Handshake response.
*/
@@ -2573,7 +3010,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Nullable GridNioRecoveryDescriptor recovery,
UUID rmtNodeId,
long timeout,
- GridSslMeta sslMeta
+ GridSslMeta sslMeta,
+ @Nullable Integer handshakeConnIdx
) throws IgniteCheckedException {
HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
@@ -2655,14 +3093,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
if (recovery != null) {
- HandshakeMessage msg = new HandshakeMessage(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received());
+ HandshakeMessage msg;
+
+ int msgSize = 33;
+
+ if (handshakeConnIdx != null) {
+ msg = new HandshakeMessage2(locNode.id(),
+ recovery.incrementConnectCount(),
+ recovery.received(),
+ handshakeConnIdx);
+
+ msgSize += 4;
+ }
+ else {
+ msg = new HandshakeMessage(locNode.id(),
+ recovery.incrementConnectCount(),
+ recovery.received());
+ }
if (log.isDebugEnabled())
log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
- buf = ByteBuffer.allocate(33);
+ buf = ByteBuffer.allocate(msgSize);
buf.order(ByteOrder.nativeOrder());
@@ -2689,6 +3141,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else
ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
}
+
if (recovery != null) {
if (log.isDebugEnabled())
log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
@@ -2818,26 +3271,81 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
U.join(commWorker, log);
- for (GridCommunicationClient client : clients.values())
- client.forceClose();
+ for (GridCommunicationClient[] clients0 : clients.values()) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null)
+ client.forceClose();
+ }
+ }
+ }
+
+ /**
+ * @param node Node.
+ * @param key Connection key.
+ * @return Recovery descriptor for outgoing connection.
+ */
+ private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
+ if (usePairedConnections(node))
+ return recoveryDescriptor(outRecDescs, true, node, key);
+ else
+ return recoveryDescriptor(recoveryDescs, false, node, key);
}
/**
* @param node Node.
- * @return Recovery receive data for given node.
+ * @param key Connection key.
+ * @return Recovery descriptor for incoming connection.
+ */
+ private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
+ if (usePairedConnections(node))
+ return recoveryDescriptor(inRecDescs, true, node, key);
+ else
+ return recoveryDescriptor(recoveryDescs, false, node, key);
+ }
+
+ /**
+ * @param node Node.
+ * @return {@code True} if given node supports multiple connections per-node for communication.
+ */
+ private boolean useMultipleConnections(ClusterNode node) {
+ return node.version().compareToIgnoreTimestamp(MULTIPLE_CONN_SINCE_VER) >= 0;
+ }
+
+ /**
+ * @param node Node.
+ * @return {@code True} if can use in/out connection pair for communication.
*/
- private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) {
- ClientKey id = new ClientKey(node.id(), node.order());
+ private boolean usePairedConnections(ClusterNode node) {
+ if (usePairedConnections) {
+ Boolean attr = node.attribute(createSpiAttributeName(ATTR_PAIRED_CONN));
+
+ return attr != null && attr;
+ }
+
+ return false;
+ }
- GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
+ /**
+ * @param recoveryDescs Descriptors map.
+ * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
+ * @param node Node.
+ * @param key Connection key.
+ * @return Recovery receive data for given node.
+ */
+ private GridNioRecoveryDescriptor recoveryDescriptor(
+ ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs,
+ boolean pairedConnections,
+ ClusterNode node,
+ ConnectionKey key) {
+ GridNioRecoveryDescriptor recovery = recoveryDescs.get(key);
if (recovery == null) {
int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
- int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
+ int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 128);
- GridNioRecoveryDescriptor old =
- recoveryDescs.putIfAbsent(id, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log));
+ GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key,
+ recovery = new GridNioRecoveryDescriptor(pairedConnections, queueLimit, node, log));
if (old != null)
recovery = old;
@@ -2879,54 +3387,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return S.toString(TcpCommunicationSpi.class, this);
}
- /**
- *
- */
- private static class ClientKey {
- /** */
- private UUID nodeId;
-
- /** */
- private long order;
-
- /**
- * @param nodeId Node ID.
- * @param order Node order.
- */
- private ClientKey(UUID nodeId, long order) {
- this.nodeId = nodeId;
- this.order = order;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- if (this == obj)
- return true;
-
- if (obj == null || getClass() != obj.getClass())
- return false;
-
- ClientKey other = (ClientKey)obj;
-
- return order == other.order && nodeId.equals(other.nodeId);
-
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = nodeId.hashCode();
-
- res = 31 * res + (int)(order ^ (order >>> 32));
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ClientKey.class, this);
- }
- }
-
/** Internal exception class for proper timeout handling. */
private static class HandshakeTimeoutException extends IgniteCheckedException {
/** */
@@ -3026,9 +3486,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- UUID rmtNodeId = ses.meta(NODE_ID_META);
+ ConnectionKey connKey = ses.meta(CONN_IDX_META);
- return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
+ return connKey != null ? formatter.writer(connKey.nodeId()) : null;
}
};
@@ -3042,9 +3502,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- UUID rmtNodeId = ses.meta(NODE_ID_META);
+ ConnectionKey connKey = ses.meta(CONN_IDX_META);
- return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
+ return connKey != null ? formatter.reader(connKey.nodeId(), msgFactory) : null;
}
};
@@ -3125,62 +3585,108 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private void processIdle() {
cleanupRecovery();
- for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
+ for (Map.Entry<UUID, GridCommunicationClient[]> e : clients.entrySet()) {
UUID nodeId = e.getKey();
- GridCommunicationClient client = e.getValue();
+ for (GridCommunicationClient client : e.getValue()) {
+ if (client == null)
+ continue;
- ClusterNode node = getSpiContext().node(nodeId);
+ ClusterNode node = getSpiContext().node(nodeId);
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Forcing close of non-existent node connection: " + nodeId);
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Forcing close of non-existent node connection: " + nodeId);
- client.forceClose();
+ client.forceClose();
- clients.remove(nodeId, client);
+ removeNodeClient(nodeId, client);
- continue;
- }
+ continue;
+ }
- GridNioRecoveryDescriptor recovery = null;
+ GridNioRecoveryDescriptor recovery = null;
- if (client instanceof GridTcpNioCommunicationClient) {
- recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+ if (!usePairedConnections(node) && client instanceof GridTcpNioCommunicationClient) {
+ recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
- if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
- RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+ if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+ RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
- if (log.isDebugEnabled())
- log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
- ", rcvCnt=" + msg.received() + ']');
+ if (log.isDebugEnabled())
+ log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
+ ", rcvCnt=" + msg.received() + ']');
- nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+ try {
+ nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
- recovery.lastAcknowledged(msg.received());
+ recovery.lastAcknowledged(msg.received());
+ }
+ catch (IgniteCheckedException err) {
+ U.error(log, "Failed to send message: " + err, err);
+ }
- continue;
+ continue;
+ }
}
- }
- long idleTime = client.getIdleTime();
+ long idleTime = client.getIdleTime();
+
+ if (idleTime >= idleConnTimeout) {
+ if (recovery == null && usePairedConnections(node))
+ recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
+
+ if (recovery != null &&
+ recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+ !recovery.messagesRequests().isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Node connection is idle, but there are unacknowledged messages, " +
+ "will wait: " + nodeId);
+
+ continue;
+ }
- if (idleTime >= idleConnTimeout) {
- if (recovery != null &&
- recovery.nodeAlive(getSpiContext().node(nodeId)) &&
- !recovery.messagesFutures().isEmpty()) {
if (log.isDebugEnabled())
- log.debug("Node connection is idle, but there are unacknowledged messages, " +
- "will wait: " + nodeId);
+ log.debug("Closing idle node connection: " + nodeId);
- continue;
+ if (client.close() || client.closed())
+ removeNodeClient(nodeId, client);
}
+ }
+ }
- if (log.isDebugEnabled())
- log.debug("Closing idle node connection: " + nodeId);
+ for (GridNioSession ses : nioSrvr.sessions()) {
+ GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
+
+ if (recovery != null && usePairedConnections(recovery.node())) {
+ assert ses.accepted() : ses;
+
+ sendAckOnTimeout(recovery, ses);
+ }
+ }
+ }
+
+ /**
+ * @param recovery Recovery descriptor.
+ * @param ses Session.
+ */
+ private void sendAckOnTimeout(GridNioRecoveryDescriptor recovery, GridNioSession ses) {
+ if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+ RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+
+ if (log.isDebugEnabled()) {
+ log.debug("Send recovery acknowledgement on timeout [rmtNode=" + recovery.node().id() +
+ ", rcvCnt=" + msg.received() +
+ ", lastAcked=" + recovery.lastAcknowledged() + ']');
+ }
+
+ try {
+ nioSrvr.sendSystem(ses, msg);
- if (client.close() || client.closed())
- clients.remove(nodeId, client);
+ recovery.lastAcknowledged(msg.received());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + e, e);
}
}
}
@@ -3189,15 +3695,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*
*/
private void cleanupRecovery() {
- Set<ClientKey> left = null;
+ cleanupRecovery(recoveryDescs);
+ cleanupRecovery(inRecDescs);
+ cleanupRecovery(outRecDescs);
+ }
+
+ /**
+ * @param recoveryDescs Recovery descriptors to cleanup.
+ */
+ private void cleanupRecovery(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs) {
+ Set<ConnectionKey> left = null;
- for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
+ for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
if (left != null && left.contains(e.getKey()))
continue;
- GridNioRecoveryDescriptor recoverySnd = e.getValue();
+ GridNioRecoveryDescriptor recoveryDesc = e.getValue();
- if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
+ if (!recoveryDesc.nodeAlive(getSpiContext().node(e.getKey().nodeId()))) {
if (left == null)
left = new HashSet<>();
@@ -3208,11 +3723,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (left != null) {
assert !left.isEmpty();
- for (ClientKey id : left) {
- GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id);
+ for (ConnectionKey id : left) {
+ GridNioRecoveryDescriptor recoveryDesc = recoveryDescs.get(id);
- if (recoverySnd != null && recoverySnd.onNodeLeft())
- recoveryDescs.remove(id);
+ if (recoveryDesc != null && recoveryDesc.onNodeLeft())
+ recoveryDescs.remove(id, recoveryDesc);
}
}
}
@@ -3221,45 +3736,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param sesInfo Disconnected session information.
*/
private void processDisconnect(DisconnectedSessionInfo sesInfo) {
- if (sesInfo.reconnect) {
- GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
-
- ClusterNode node = recoveryDesc.node();
+ GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
- if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
- return;
+ ClusterNode node = recoveryDesc.node();
- try {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+ if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+ return;
- GridCommunicationClient client = reserveClient(node);
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
- client.release();
- }
- catch (IgniteCheckedException | IgniteException e) {
- try {
- if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect failed, will retry " +
- "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+ GridCommunicationClient client = reserveClient(node, sesInfo.connIdx);
- addProcessDisconnectRequest(sesInfo);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect failed, " +
- "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+ client.release();
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ try {
+ if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
+ if (log.isDebugEnabled())
+ log.debug("Recovery reconnect failed, will retry " +
+ "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
- onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
- e);
- }
+ addProcessDisconnectRequest(sesInfo);
}
- catch (IgniteClientDisconnectedException e0) {
+ else {
if (log.isDebugEnabled())
- log.debug("Failed to ping node, client disconnected.");
+ log.debug("Recovery reconnect failed, " +
+ "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+ onE
<TRUNCATED>