You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/09 13:51:40 UTC
[17/18] ignite git commit: ignite-4003 Async outgoing connections for
communication SPI
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index f13f1f2..937523c 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -35,6 +35,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -46,8 +47,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
@@ -66,7 +65,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridConcurrentFactory;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.ipc.IpcEndpoint;
import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
@@ -78,6 +77,7 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridDirectParser;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
@@ -90,9 +90,7 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
-import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
-import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -136,7 +134,6 @@ import org.jsr166.LongAdder8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
/**
* <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
@@ -296,10 +293,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** Connection index meta for session. */
private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
+ /** Recovery descriptor meta key. */
+ private static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
/** Message tracker meta for session. */
private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
+ /** Connection context meta key. */
+ private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
/**
* Default local port range (value is <tt>100</tt>).
* See {@link #setLocalPortRange(int)} for details.
@@ -334,6 +336,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** Handshake message type. */
public static final short HANDSHAKE_MSG_TYPE = -3;
+ /** Ignite header message. */
+ private static final Message IGNITE_HEADER_MSG = new IgniteHeaderMessage();
+
+ /** Skip ack. For test purposes only. */
+ private boolean skipAck;
+
/** */
private ConnectGateway connectGate;
@@ -407,10 +415,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
log.debug("Session was closed but there are unacknowledged messages, " +
"will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
- DisconnectedSessionInfo disconnectData =
- new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
+ SessionInfo sesInfo =
+ new SessionInfo(ses, connId.connectionIndex(), SessionState.RECONNECT);
- commWorker.addProcessDisconnectRequest(disconnectData);
+ commWorker.addSessionStateChangeRequest(sesInfo);
}
}
else
@@ -436,14 +444,69 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (msg instanceof NodeIdMessage) {
sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
- connKey = new ConnectionKey(sndId, 0, -1);
+
+ if (ses.remoteAddress() != null) { // Not shmem.
+ assert !ses.accepted();
+
+ ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+ assert ctx != null;
+ assert ctx.expNodeId != null;
+
+ if (sndId.equals(ctx.expNodeId)) {
+ GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
+
+ assert recoveryDesc != null;
+
+ long connCnt = recoveryDesc.incrementConnectCount();
+
+ connKey = new ConnectionKey(sndId, ctx.connIdx, connCnt);
+
+ final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
+
+ assert old == null;
+
+ ses.send(IGNITE_HEADER_MSG);
+
+ ClusterNode locNode = getLocalNode();
+
+ if (locNode == null) {
+ commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE,
+ new IgniteCheckedException("Local node has not been started or " +
+ "fully initialized [isStopping=" + getSpiContext().isStopping() + ']')));
+
+ return;
+ }
+
+ int handshakeConnIdx = connPlc.connectionIndex();
+
+ HandshakeMessage handshakeMsg = new HandshakeMessage2(locNode.id(), connCnt,
+ recoveryDesc.received(), handshakeConnIdx);
+
+ if (log.isDebugEnabled())
+ log.debug("Write handshake message [rmtNode=" + sndId +
+ ", msg=" + handshakeMsg + ']');
+
+ ses.send(handshakeMsg);
+ }
+ else {
+ commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE,
+ new IgniteCheckedException("Remote node ID is not as expected [expected=" +
+ ctx.expNodeId + ", rcvd=" + sndId + ']')));
+ }
+
+ return;
+ }
+ else
+ connKey = new ConnectionKey(sndId, 0, -1);
}
else {
assert msg instanceof HandshakeMessage : msg;
HandshakeMessage msg0 = (HandshakeMessage)msg;
- sndId = ((HandshakeMessage)msg).nodeId();
+ sndId = msg0.nodeId();
+
connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
}
@@ -484,30 +547,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (reserve)
connectedNew(recoveryDesc, ses, true);
else {
- if (c.failed) {
- ses.send(new RecoveryLastReceivedMessage(-1));
-
- for (GridNioSession ses0 : nioSrvr.sessions()) {
- ConnectionKey key0 = ses0.meta(CONN_IDX_META);
-
- if (ses0.accepted() && key0 != null &&
- key0.nodeId().equals(connKey.nodeId()) &&
- key0.connectionIndex() == connKey.connectionIndex() &&
- key0.connectCount() < connKey.connectCount())
- ses0.close();
- }
+ for (GridNioSession ses0 : nioSrvr.sessions()) {
+ ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+ if (ses0.accepted() && key0 != null &&
+ key0.nodeId().equals(connKey.nodeId()) &&
+ key0.connectionIndex() == connKey.connectionIndex() &&
+ key0.connectCount() < connKey.connectCount())
+ ses0.close();
}
}
}
else {
assert connKey.connectionIndex() >= 0 : connKey;
- GridCommunicationClient[] curClients = clients.get(sndId);
-
- GridCommunicationClient oldClient =
- curClients != null && connKey.connectionIndex() < curClients.length ?
- curClients[connKey.connectionIndex()] :
- null;
+ GridCommunicationClient oldClient = nodeClient(sndId, connKey.connectionIndex());
boolean hasShmemClient = false;
@@ -536,10 +590,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
if (oldFut == null) {
- curClients = clients.get(sndId);
-
- oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
- curClients[connKey.connectionIndex()] : null;
+ oldClient = nodeClient(sndId, connKey.connectionIndex());
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
@@ -583,7 +634,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
else {
- if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+ if (oldFut instanceof ReserveClientFuture && locNode.order() < rmtNode.order()) {
if (log.isDebugEnabled()) {
log.debug("Received incoming connection from remote node while " +
"connecting to this node, rejecting [locNode=" + locNode.id() +
@@ -605,17 +656,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
- @Override public void onMessage(GridNioSession ses, Message msg) {
+ @Override public void onMessage(final GridNioSession ses, Message msg) {
ConnectionKey connKey = ses.meta(CONN_IDX_META);
if (connKey == null) {
- assert ses.accepted() : ses;
-
- if (!connectGate.tryEnter()) {
+ if (ses.accepted() && !connectGate.tryEnter()) { // Outgoing connection already entered gate.
if (log.isDebugEnabled())
log.debug("Close incoming connection, failed to enter gateway.");
- ses.close();
+ try {
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1), new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ try {
+ fut.get();
+
+ ses.close();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send last received message: " + e, e);
+ }
+ }
+ });
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + e, e);
+ }
return;
}
@@ -624,7 +689,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
onFirstMessage(ses, msg);
}
finally {
- connectGate.leave();
+ if (ses.accepted())
+ connectGate.leave();
}
}
else {
@@ -636,13 +702,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (recovery != null) {
RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
+ long rcvCnt = msg0.received();
+
if (log.isDebugEnabled()) {
log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() +
", connIdx=" + connKey.connectionIndex() +
", rcvCnt=" + msg0.received() + ']');
}
- recovery.ackReceived(msg0.received());
+ ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+ if (!ses.accepted() && ctx != null && ctx.rcvCnt == Long.MIN_VALUE) {
+ HandshakeTimeoutObject timeoutObj = ctx.handshakeTimeoutObj;
+
+ Exception err = null;
+
+ if (timeoutObj != null && !cancelHandshakeTimeout(timeoutObj)) {
+ err = new HandshakeTimeoutException("Failed to perform handshake due to timeout " +
+ "(consider increasing 'connectionTimeout' configuration property).");
+ }
+
+ if (rcvCnt == -1 || err != null) {
+ if (ses.remoteAddress() != null) {
+ SessionInfo sesInfo = new SessionInfo(ses, SessionState.CLOSE, err);
+
+ commWorker.addSessionStateChangeRequest(sesInfo);
+ }
+ }
+ else {
+ ctx.rcvCnt = rcvCnt;
+
+ recovery.onHandshake(rcvCnt);
+
+ nioSrvr.resend(ses);
+
+ recovery.onConnected();
+
+ SessionInfo sesInfo = new SessionInfo(ses, connKey.idx, SessionState.READY);
+
+ commWorker.addSessionStateChangeRequest(sesInfo);
+ }
+ }
+ else
+ recovery.ackReceived(rcvCnt);
return;
}
@@ -660,7 +762,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
", rcvCnt=" + rcvCnt + ']');
}
- ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
+ if (!skipAck)
+ ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
recovery.lastAcknowledged(rcvCnt);
}
@@ -711,7 +814,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
assert connKey != null && connKey.connectionIndex() >= 0 : connKey;
assert !usePairedConnections(node);
- recovery.onHandshake(rcvCnt);
+ if (ses.accepted())
+ recovery.onHandshake(rcvCnt);
ses.inRecoveryDescriptor(recovery);
ses.outRecoveryDescriptor(recovery);
@@ -777,9 +881,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** */
private final ClusterNode rmtNode;
- /** */
- private boolean failed;
-
/**
* @param ses Incoming session.
* @param recoveryDesc Recovery descriptor.
@@ -796,8 +897,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** {@inheritDoc} */
@Override public void apply(Boolean success) {
try {
- failed = !success;
-
if (success) {
IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> msgFut) {
@@ -918,10 +1017,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
else {
try {
- fut.onDone();
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1), new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> msgFut) {
+ try {
+ msgFut.get();
+ } catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send recovery handshake " +
+ "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+
+ recoveryDesc.release();
+ } finally {
+ fut.onDone();
+
+ clientFuts.remove(connKey, fut);
+
+ ses.close();
+ }
+ }
+ });
}
- finally {
- clientFuts.remove(connKey, fut);
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + e, e);
}
}
}
@@ -1916,12 +2033,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
nioSrvr.dumpStats();
}
- /** */
- private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
-
- /** */
- private final AtomicInteger connIdx = new AtomicInteger();
-
/** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
initFailureDetectionTimeout();
@@ -2177,9 +2288,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
assert formatter != null;
+ UUID rmtNodeId = null;
+
ConnectionKey key = ses.meta(CONN_IDX_META);
- return key != null ? formatter.writer(key.nodeId()) : null;
+ if (key != null)
+ rmtNodeId = key.nodeId();
+ else {
+ ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+ if (ctx != null)
+ rmtNodeId = ctx.expNodeId;
+ }
+
+ return key != null ? formatter.writer(rmtNodeId) : null;
}
};
@@ -2189,7 +2311,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
@Override public boolean apply(Message msg) {
- return msg instanceof RecoveryLastReceivedMessage;
+ return msg instanceof NotRecoverable;
}
};
@@ -2528,48 +2650,108 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
int connIdx = connPlc.connectionIndex();
+ send(node, connIdx, msg, ackC);
+ }
+ }
+
+ /**
+ * Try to send message.
+ */
+ private void send(final ClusterNode node,
+ final int connIdx,
+ final Message msg,
+ final IgniteInClosure<IgniteException> ackC
+ ) {
+ final GridCommunicationClient client = nodeClient(node.id(), connIdx);
+
+ if (client != null && client.reserve()) {
try {
- boolean retry;
+ send0(client, node, msg, ackC);
+ }
+ catch (IgniteCheckedException e) {
+ if (removeNodeClient(node.id(), client))
+ client.forceClose();
- do {
- client = reserveClient(node, connIdx);
+ throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
+ }
+ }
+ else {
+ if (client != null)
+ removeNodeClient(node.id(), client);
- UUID nodeId = null;
+ IgniteInternalFuture<GridCommunicationClient> clientFut = reserveClient(node, connIdx);
- if (!client.async())
- nodeId = node.id();
+ clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+ GridCommunicationClient client = null;
- retry = client.sendMessage(nodeId, msg, ackC);
+ try {
+ client = fut.get();
- client.release();
+ send0(client, node, msg, ackC);
+ }
+ catch (IgniteCheckedException e) {
+ LT.error(log, e, "Unexpected error occurred during sending of message to node: " + node.id());
- if (!retry)
- sentMsgsCnt.increment();
- else {
- removeNodeClient(node.id(), client);
+ if (client != null && removeNodeClient(node.id(), client))
+ client.forceClose();
+ }
+ }
+ });
+ }
+ }
- ClusterNode node0 = getSpiContext().node(node.id());
+ /**
+ * @param client Client.
+ * @param node Node.
+ * @param msg Message.
+ * @param ackC Ack closure.
+ */
+ private void send0(
+ GridCommunicationClient client,
+ ClusterNode node,
+ Message msg,
+ IgniteInClosure<IgniteException> ackC
+ ) throws IgniteCheckedException {
+ assert client != null;
- if (node0 == null)
- throw new IgniteCheckedException("Failed to send message to remote node " +
- "(node has left the grid): " + node.id());
- }
+ UUID nodeId = null;
- client = null;
- }
- while (retry);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
- }
- finally {
- if (client != null && removeNodeClient(node.id(), client))
- client.forceClose();
+ if (!client.async())
+ nodeId = node.id();
+
+ boolean retry = client.sendMessage(nodeId, msg, ackC);
+
+ client.release();
+
+ if (!retry)
+ sentMsgsCnt.increment();
+ else {
+ removeNodeClient(node.id(), client);
+
+ ClusterNode node0 = getSpiContext().node(node.id());
+
+ if (node0 == null) {
+ U.warn(log, "Failed to send message to remote node (node has left the grid): " + node.id());
+
+ return;
}
+
+ send(node, client.connectionIndex(), msg, ackC);
}
}
/**
+ * @param nodeId Node id.
+ * @param connIdx Connection index.
+ */
+ private GridCommunicationClient nodeClient(UUID nodeId, int connIdx) {
+ GridCommunicationClient[] curClients = clients.get(nodeId);
+
+ return curClients != null && connIdx < curClients.length ? curClients[connIdx] : null;
+ }
+
+ /**
* @param nodeId Node ID.
* @param rmvClient Client to remove.
* @return {@code True} if client was removed.
@@ -2638,95 +2820,245 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @param node Node to which client should be open.
* @param connIdx Connection index.
* @return The existing or just created client.
- * @throws IgniteCheckedException Thrown if any exception occurs.
*/
- private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
- assert node != null;
- assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
+ private IgniteInternalFuture<GridCommunicationClient> reserveClient(ClusterNode node, int connIdx) {
+ GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
- UUID nodeId = node.id();
+ tryReserveClient(node, connIdx, fut);
- while (true) {
- GridCommunicationClient[] curClients = clients.get(nodeId);
+ return fut;
+ }
+
+ /**
+ * @param node Node.
+ * @param connIdx Connection index.
+ * @param fut Future.
+ */
+ private void tryReserveClient(
+ final ClusterNode node,
+ final int connIdx,
+ final GridFutureAdapter<GridCommunicationClient> fut)
+ {
+ final ReserveClientFuture reserveFut = new ReserveClientFuture(node, connIdx);
+
+ reserveFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) {
+ try {
+ GridCommunicationClient client = fut0.get();
+
+ if (client != null)
+ fut.onDone(client);
+ else
+ tryReserveClient(node, connIdx, fut);
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(e);
+ }
+ }
+ });
+
+ try {
+ reserveFut.reserve();
+ }
+ catch (Exception e) {
+ fut.onDone(e);
+ }
+ }
+
+ /**
+ *
+ */
+ private class ReserveClientFuture extends GridFutureAdapter<GridCommunicationClient> {
+ /** Node. */
+ private final ClusterNode node;
+
+ /** Connection index. */
+ private final int connIdx;
+
+ /**
+ * @param node Node.
+ */
+ ReserveClientFuture(ClusterNode node, int connIdx) {
+ assert node != null;
+ assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
+
+ this.node = node;
+ this.connIdx = connIdx;
+ }
+
+ /**
+ *
+ */
+ void reserve() {
+ final UUID nodeId = node.id();
- GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
- curClients[connIdx] : null;
+ final GridCommunicationClient client = nodeClient(nodeId, connIdx);
+
+ final GridFutureAdapter<GridCommunicationClient> connFut;
if (client == null) {
- if (stopping)
- throw new IgniteSpiException("Node is stopping.");
+ if (stopping) {
+ onDone(new IgniteSpiException("Node is stopping."));
+
+ return;
+ }
// Do not allow concurrent connects.
- GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
+ connFut = this;
- ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
+ final ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
- GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
+ final GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, connFut);
if (oldFut == null) {
try {
- GridCommunicationClient[] curClients0 = clients.get(nodeId);
-
- GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
- curClients0[connIdx] : null;
+ GridCommunicationClient client0 = nodeClient(nodeId, connIdx);
if (client0 == null) {
- client0 = createNioClient(node, connIdx);
-
- if (client0 != null) {
- addNodeClient(node, connIdx, client0);
+ IgniteInternalFuture<GridCommunicationClient> clientFut = createNioClient(node, connIdx);
- if (client0 instanceof GridTcpNioCommunicationClient) {
- GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
-
- if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
- if (log.isDebugEnabled())
- log.debug("Session was closed after client creation, will retry " +
- "[node=" + node + ", client=" + client0 + ']');
+ clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+ try {
+ GridCommunicationClient client0 = fut.get();
+
+ if (client0 != null) {
+ addNodeClient(node, connIdx, client0);
+
+ if (client0 instanceof GridTcpNioCommunicationClient) {
+ GridTcpNioCommunicationClient tcpClient =
+ ((GridTcpNioCommunicationClient)client0);
+
+ if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
+ if (log.isDebugEnabled())
+ log.debug("Session was closed after client creation, " +
+ "will retry [node=" + node + ", client=" + client0 + ']');
+
+ client0 = null;
+ }
+ }
+
+ if (client0 == null) {
+ clientFuts.remove(connKey, connFut);
+
+ onDone();
+ }
+ else if (client0.reserve()) {
+ clientFuts.remove(connKey, connFut);
+
+ onDone(client0);
+ }
+ else {
+ clientFuts.remove(connKey, connFut);
+
+ removeNodeClient(nodeId, client0);
+
+ onDone();
+ }
+ }
+ else {
+ final long currTime = U.currentTimeMillis();
+
+ addTimeoutObject(new IgniteSpiTimeoutObject() {
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ @Override public long endTime() {
+ return currTime + 200;
+ }
+
+ @Override public void onTimeout() {
+ SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY,
+ new Runnable() {
+ @Override public void run() {
+ clientFuts.remove(connKey, connFut);
+
+ onDone();
+ }
+ });
+
+ commWorker.addSessionStateChangeRequest(sesInfo);
+ }
+ });
+ }
+ }
+ catch (IgniteCheckedException e) {
+ clientFuts.remove(connKey, connFut);
- client0 = null;
+ onDone(e);
}
}
- }
- else
- U.sleep(200);
+ });
}
+ else {
+ assert connIdx == client0.connectionIndex() : client0;
- fut.onDone(client0);
+ if (client0.reserve())
+ onDone(client0);
+ else {
+ removeNodeClient(nodeId, client0);
+
+ onDone();
+ }
+ }
}
catch (Throwable e) {
- fut.onDone(e);
+ connFut.onDone(e);
if (e instanceof Error)
throw (Error)e;
}
- finally {
- clientFuts.remove(connKey, fut);
- }
}
- else
- fut = oldFut;
+ else {
+ oldFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+ try {
+ GridCommunicationClient client0 = fut.get();
- client = fut.get();
+ if (client0 == null) {
+ clientFuts.remove(connKey, oldFut);
- if (client == null)
- continue;
+ onDone();
+ }
+ else if (client0.reserve()) {
+ clientFuts.remove(connKey, oldFut);
- if (getSpiContext().node(nodeId) == null) {
- if (removeNodeClient(nodeId, client))
- client.forceClose();
+ onDone(client0);
+ }
+ else {
+ clientFuts.remove(connKey, oldFut);
+
+ removeNodeClient(nodeId, client0);
- throw new IgniteSpiException("Destination node is not in topology: " + node.id());
+ onDone();
+ }
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
}
}
+ else {
+ assert connIdx == client.connectionIndex() : client;
+
+ if (client.reserve())
+ onDone(client);
+ else {
+ removeNodeClient(nodeId, client);
- assert connIdx == client.connectionIndex() : client;
+ onDone();
+ }
+ }
+ }
- if (client.reserve())
- return client;
- else
- // Client has just been closed by idle worker. Help it and try again.
- removeNodeClient(nodeId, client);
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReserveClientFuture.class, this);
}
}
@@ -2734,10 +3066,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @param node Node to create client for.
* @param connIdx Connection index.
* @return Client.
- * @throws IgniteCheckedException If failed.
*/
- @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx)
- throws IgniteCheckedException {
+ protected IgniteInternalFuture<GridCommunicationClient> createNioClient(ClusterNode node, int connIdx) {
assert node != null;
Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
@@ -2745,7 +3075,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
ClusterNode locNode = getSpiContext().localNode();
if (locNode == null)
- throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
+ return new GridFinishedFuture<>(
+ new IgniteCheckedException("Failed to create NIO client (local node is stopping)")
+ );
if (log.isDebugEnabled())
log.debug("Creating NIO client to node: " + node);
@@ -2762,7 +3094,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (log.isDebugEnabled())
log.debug("Shmem client created: " + client);
- return client;
+ return new GridFinishedFuture<>(client);
}
catch (IgniteCheckedException e) {
if (e.hasCause(IpcOutOfSystemResourcesException.class))
@@ -2773,21 +3105,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
else if (log.isDebugEnabled())
log.debug("Failed to establish shared memory connection with local node (node has left): " +
node.id());
+
+ return new GridFinishedFuture<>(e);
}
}
- connectGate.enter();
try {
- GridCommunicationClient client = createTcpClient(node, connIdx);
-
- if (log.isDebugEnabled())
- log.debug("TCP client created: " + client);
-
- return client;
+ return createTcpClient(node, connIdx);
}
- finally {
- connectGate.leave();
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
@@ -2836,12 +3164,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
try {
- safeHandshake(client,
- null,
- node.id(),
- timeoutHelper.nextTimeoutChunk(connTimeout0),
- null,
- null);
+ safeHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
}
catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
client.forceClose();
@@ -2903,7 +3226,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
ConnectionKey id = ses.meta(CONN_IDX_META);
if (id != null) {
- ClusterNode node = getSpiContext().node(id.nodeId);
+ ClusterNode node = getSpiContext().node(id.nodeId());
if (node != null && node.isClient()) {
String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
@@ -2926,532 +3249,527 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
*
* @param node Remote node.
* @param connIdx Connection index.
- * @return Client.
+ * @return Client future.
* @throws IgniteCheckedException If failed.
*/
- protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
- Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
- Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
- Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
- Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
-
- boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
- boolean isExtAddrsExist = !F.isEmpty(extAddrs);
+ protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
+ throws IgniteCheckedException
+ {
+ TcpClientFuture fut = new TcpClientFuture(node, connIdx);
- if (!isRmtAddrsExist && !isExtAddrsExist)
- throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
- "TCP communication addresses or mapped external addresses. Check configuration and make sure " +
- "that you use the same communication SPI on all nodes. Remote node id: " + node.id());
+ connectGate.enter();
- LinkedHashSet<InetSocketAddress> addrs;
+ fut.connect();
- // Try to connect first on bound addresses.
- if (isRmtAddrsExist) {
- List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) {
+ connectGate.leave();
+ }
+ });
- boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
+ return fut;
+ }
- Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
+ /**
+ * @param timeoutObj Timeout object.
+ */
+ private boolean cancelHandshakeTimeout(HandshakeTimeoutObject timeoutObj) {
+ boolean cancelled = timeoutObj.cancel();
- addrs = new LinkedHashSet<>(addrs0);
- }
- else
- addrs = new LinkedHashSet<>();
+ if (cancelled)
+ removeTimeoutObject(timeoutObj);
- // Then on mapped external addresses.
- if (isExtAddrsExist)
- addrs.addAll(extAddrs);
+ return cancelled;
+ }
- Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
+ /**
+ *
+ */
+ private class TcpClientFuture extends GridFutureAdapter<GridCommunicationClient> {
+ /** Node. */
+ private final ClusterNode node;
- for (InetSocketAddress addr : addrs)
- allInetAddrs.add(addr.getAddress());
+ /** Timeout helper. */
+ private final IgniteSpiOperationTimeoutHelper timeoutHelper =
+ new IgniteSpiOperationTimeoutHelper(TcpCommunicationSpi.this);
- List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
+ /** Addresses. */
+ private Collection<InetSocketAddress> addrs;
- if (reachableInetAddrs.size() < allInetAddrs.size()) {
- LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
+ /** Addresses it. */
+ private Iterator<InetSocketAddress> addrsIt;
- for (InetSocketAddress addr : addrs) {
- if (reachableInetAddrs.contains(addr.getAddress()))
- addrs0.add(addr);
- }
- for (InetSocketAddress addr : addrs) {
- if (!reachableInetAddrs.contains(addr.getAddress()))
- addrs0.add(addr);
- }
+ /** Current addresses. */
+ private volatile InetSocketAddress currAddr;
- addrs = addrs0;
- }
+ /** Err. */
+ private volatile IgniteCheckedException err;
- if (log.isDebugEnabled())
- log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
+ /** Connect attempts. */
+ private volatile int connectAttempts;
- boolean conn = false;
- GridCommunicationClient client = null;
- IgniteCheckedException errs = null;
+ /** Attempts. */
+ private volatile int attempt;
- int connectAttempts = 1;
+ /** Connection index. */
+ private volatile int connIdx;
- for (InetSocketAddress addr : addrs) {
- long connTimeout0 = connTimeout;
+ /**
+ * @param node Node.
+ */
+ TcpClientFuture(ClusterNode node, int connIdx) {
+ this.node = node;
+ this.connIdx = connIdx;
+ }
- int attempt = 1;
+ /**
+ * Connects to remote node.
+ */
+ void connect() {
+ try {
+ addrs = addrs();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
- IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+ return;
+ }
- while (!conn) { // Reconnection on handshake timeout.
- try {
- SocketChannel ch = SocketChannel.open();
+ addrsIt = addrs.iterator();
- ch.configureBlocking(true);
+ tryConnect(true);
+ }
- ch.socket().setTcpNoDelay(tcpNoDelay);
- ch.socket().setKeepAlive(true);
+ /**
+ *
+ */
+ private void tryConnect(boolean next) {
+ if (next && !addrsIt.hasNext()) {
+ IgniteCheckedException err0 = err;
- if (sockRcvBuf > 0)
- ch.socket().setReceiveBufferSize(sockRcvBuf);
+ assert err0 != null;
- if (sockSndBuf > 0)
- ch.socket().setSendBufferSize(sockSndBuf);
+ UUID nodeId = node.id();
- if (getSpiContext().node(node.id()) == null) {
- U.closeQuiet(ch);
+ if (getSpiContext().node(nodeId) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
+ X.hasCause(err, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
+ IgniteSpiOperationTimeoutException.class))
+ {
+ LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be " +
+ "dropped from cluster [" + "rmtNode=" + node + ", err=" + err +
+ ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']');
- throw new ClusterTopologyCheckedException("Failed to send message " +
- "(node left topology): " + node);
- }
+ getSpiContext().failNode(nodeId, "TcpCommunicationSpi failed to establish connection to node " +
+ "[rmtNode=" + node + ", errs=" + err + ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']');
+ }
- ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
+ onDone(err0);
- GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
+ return;
+ }
- if (!recoveryDesc.reserve()) {
- U.closeQuiet(ch);
+ if (next) {
+ attempt = 0;
- return null;
- }
+ connectAttempts = 0;
- long rcvCnt = -1;
+ currAddr = addrsIt.next();
+ }
- Map<Integer, Object> meta = new HashMap<>();
+ InetSocketAddress addr = currAddr;
- GridSslMeta sslMeta = null;
+ try {
+ final SocketChannel ch = SocketChannel.open();
- try {
- ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout));
+ ch.configureBlocking(false);
- if (isSslEnabled()) {
- meta.put(SSL_META.ordinal(), sslMeta = new GridSslMeta());
+ ch.socket().setTcpNoDelay(tcpNoDelay);
+ ch.socket().setKeepAlive(true);
- SSLEngine sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine();
+ if (sockRcvBuf > 0)
+ ch.socket().setReceiveBufferSize(sockRcvBuf);
- sslEngine.setUseClientMode(true);
+ if (sockSndBuf > 0)
+ ch.socket().setSendBufferSize(sockSndBuf);
- sslMeta.sslEngine(sslEngine);
- }
+ if (getSpiContext().node(node.id()) == null) {
+ U.closeQuiet(ch);
- Integer handshakeConnIdx = connIdx;
+ onError(new ClusterTopologyCheckedException("Failed to send message " +
+ "(node left topology): " + node));
- rcvCnt = safeHandshake(ch,
- recoveryDesc,
- node.id(),
- timeoutHelper.nextTimeoutChunk(connTimeout0),
- sslMeta,
- handshakeConnIdx);
+ return;
+ }
- if (rcvCnt == -1)
- return null;
- }
- finally {
- if (recoveryDesc != null && rcvCnt == -1)
- recoveryDesc.release();
- }
+ final ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
- try {
- meta.put(CONN_IDX_META, connKey);
+ final GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
- if (recoveryDesc != null) {
- recoveryDesc.onHandshake(rcvCnt);
+ if (!recoveryDesc.reserve()) {
+ U.closeQuiet(ch);
- meta.put(-1, recoveryDesc);
- }
+ onDone();
- GridNioSession ses = nioSrvr.createSession(ch, meta).get();
+ return;
+ }
- client = new GridTcpNioCommunicationClient(connIdx, ses, log);
+ final Map<Integer, Object> meta = new HashMap<>();
- conn = true;
- }
- finally {
- if (!conn) {
- if (recoveryDesc != null)
- recoveryDesc.release();
- }
- }
- }
- catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
- if (client != null) {
- client.forceClose();
+ final ConnectContext ctx = new ConnectContext();
- client = null;
- }
+ ctx.expNodeId = node.id();
- if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
- timeoutHelper.checkFailureTimeoutReached(e))) {
+ ctx.tcpClientFut = this;
- String msg = "Handshake timed out (failure detection timeout is reached) " +
- "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']';
+ ctx.connIdx = connIdx;
- onException(msg, e);
+ meta.put(CONN_CTX_META_KEY, ctx);
- if (log.isDebugEnabled())
- log.debug(msg);
+ meta.put(RECOVERY_DESC_META_KEY, recoveryDesc);
- if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
- "Make sure that each ComputeTask and cache Transaction has a timeout set " +
- "in order to prevent parties from waiting forever in case of network issues " +
- "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+ final int timeoutChunk = (int)timeoutHelper.nextTimeoutChunk(connTimeout);
- errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+ final int attempt0 = attempt;
- break;
- }
+ final ConnectionTimeoutObject connTimeoutObj = new ConnectionTimeoutObject(ch, meta,
+ U.currentTimeMillis() + timeoutChunk * (1L << attempt0));
- assert !failureDetectionTimeoutEnabled();
+ addTimeoutObject(connTimeoutObj);
- onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
- ", addr=" + addr + ']', e);
+ boolean connect = ch.connect(addr);
- if (log.isDebugEnabled())
- log.debug(
- "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
- ", addr=" + addr + ", err=" + e + ']');
+ IgniteInClosure<IgniteInternalFuture<GridNioSession>> lsnr0 = new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+ @Override public void apply(final IgniteInternalFuture<GridNioSession> fut) {
+ GridNioSession ses = null;
- if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
- if (log.isDebugEnabled())
- log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
- "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
- ", attempt=" + attempt + ", reconCnt=" + reconCnt +
- ", err=" + e.getMessage() + ", addr=" + addr + ']');
+ try {
+ ses = fut.get();
- if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
- "Make sure that each ComputeTask and cache Transaction has a timeout set " +
- "in order to prevent parties from waiting forever in case of network issues " +
- "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+ boolean canceled = connTimeoutObj.cancel();
- errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+ if (canceled)
+ removeTimeoutObject(connTimeoutObj);
+ else {
+ final GridNioSession ses0 = ses;
- break;
- }
- else {
- attempt++;
+ Runnable clo = new Runnable() {
+ @Override public void run() {
+ GridNioFuture<Boolean> fut = nioSrvr.close(ses0);
- connTimeout0 *= 2;
+ final SocketTimeoutException e = new SocketTimeoutException("Connect timed " +
+ "(consider increasing 'connTimeout' configuration property) [addr=" +
+ currAddr + ", connTimeout=" + connTimeout + ']');
- // Continue loop.
- }
- }
- catch (Exception e) {
- if (client != null) {
- client.forceClose();
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
+ Runnable clo = new Runnable() {
+ @Override public void run() {
+ onError(e);
+ }
+ };
- client = null;
- }
+ SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo);
- onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);
+ commWorker.addSessionStateChangeRequest(sesInfo);
+ }
+ });
+ }
+ };
- if (log.isDebugEnabled())
- log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
+ commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, clo));
- boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
+ return;
+ }
- if (failureDetThrReached)
- LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
- "configuration property) [addr=" + addr + ", failureDetectionTimeout=" +
- failureDetectionTimeout() + ']');
- else if (X.hasCause(e, SocketTimeoutException.class))
- LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " +
- "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
+ int timeoutChunk1 = (int) timeoutHelper.nextTimeoutChunk(connTimeout);
- if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
- "Make sure that each ComputeTask and cache Transaction has a timeout set " +
- "in order to prevent parties from waiting forever in case of network issues " +
- "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+ long time = U.currentTimeMillis() + timeoutChunk1 * (1L << attempt0);
- errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+ HandshakeTimeoutObject<SocketChannel> handshakeTimeoutObj =
+ new HandshakeTimeoutObject<>(ch, TcpClientFuture.this, time);
- // Reconnect for the second time, if connection is not established.
- if (!failureDetThrReached && connectAttempts < 2 &&
- (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
- connectAttempts++;
+ ctx.handshakeTimeoutObj = handshakeTimeoutObj;
- continue;
- }
+ addTimeoutObject(handshakeTimeoutObj);
+ }
+ catch (final IgniteSpiOperationTimeoutException e) {
+ assert ses != null;
- break;
- }
- }
+ final GridNioSession ses0 = ses;
- if (conn)
- break;
- }
+ commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, new Runnable() {
+ @Override public void run() {
+ GridNioFuture<Boolean> closeFut = nioSrvr.close(ses0);
- if (client == null) {
- assert errs != null;
+ closeFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
+ Runnable clo = new Runnable() {
+ @Override public void run() {
+ onError(e);
+ }
+ };
- if (X.hasCause(errs, ConnectException.class))
- LT.warn(log, "Failed to connect to a remote node " +
- "(make sure that destination node is alive and " +
- "operating system firewall is disabled on local and remote hosts) " +
- "[addrs=" + addrs + ']');
+ SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo);
- if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
- X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
- IgniteSpiOperationTimeoutException.class)) {
- LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
- "cluster [" +
- "rmtNode=" + node +
- ", err=" + errs +
- ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+ commWorker.addSessionStateChangeRequest(sesInfo);
+ }
+ });
+ }
+ }));
+ }
+ catch (IgniteCheckedException e) {
+ connTimeoutObj.cancel();
- getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
- "rmtNode=" + node +
- ", errs=" + errs +
- ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
- }
+ removeTimeoutObject(connTimeoutObj);
- throw errs;
+ recoveryDesc.release();
+
+ onError(e);
+ }
+ }
+ };
+
+ nioSrvr.createSession(ch, meta, !connect, lsnr0);
+ }
+ catch (Exception e) {
+ onDone(e);
+ }
}
- return client;
- }
+ /**
+ * @param e Exception.
+ */
+ void onError(Exception e) {
+ if (e instanceof HandshakeTimeoutException || e instanceof IgniteSpiOperationTimeoutException) {
+ if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+ timeoutHelper.checkFailureTimeoutReached(e))) {
- /**
- * Performs handshake in timeout-safe way.
- *
- * @param client Client.
- * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
- * @param rmtNodeId Remote node.
- * @param timeout Timeout for handshake.
- * @param sslMeta Session meta.
- * @param handshakeConnIdx Non null connection index if need send it in handshake.
- * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
- * @return Handshake response.
- */
- @SuppressWarnings("ThrowFromFinallyBlock")
- private <T> long safeHandshake(
- T client,
- @Nullable GridNioRecoveryDescriptor recovery,
- UUID rmtNodeId,
- long timeout,
- GridSslMeta sslMeta,
- @Nullable Integer handshakeConnIdx
- ) throws IgniteCheckedException {
- HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
+ String msg = "Handshake timed out (failure detection timeout is reached) " +
+ "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + currAddr + ']';
- addTimeoutObject(obj);
+ onException(msg, e);
- long rcvCnt = 0;
+ if (log.isDebugEnabled())
+ log.debug(msg);
- try {
- if (client instanceof GridCommunicationClient)
- ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
- else {
- SocketChannel ch = (SocketChannel)client;
+ if (err == null)
+ err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+ "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
- boolean success = false;
+ err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
- try {
- BlockingSslHandler sslHnd = null;
+ tryConnect(true);
- ByteBuffer buf;
+ return;
+ }
- if (isSslEnabled()) {
- assert sslMeta != null;
+ assert !failureDetectionTimeoutEnabled();
- sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
+ long connTimeout0 = connTimeout * attempt;
- if (!sslHnd.handshake())
- throw new IgniteCheckedException("SSL handshake is not completed.");
+ onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout +
+ ", addr=" + currAddr + ']', e);
- ByteBuffer handBuff = sslHnd.applicationBuffer();
+ if (log.isDebugEnabled())
+ log.debug(
+ "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout +
+ ", addr=" + currAddr + ", err=" + e + ']');
- if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
- buf = ByteBuffer.allocate(1000);
+ if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
+ if (log.isDebugEnabled())
+ log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
+ "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
+ ", attempt=" + attempt + ", reconCnt=" + reconCnt +
+ ", err=" + e.getMessage() + ", addr=" + currAddr + ']');
- int read = ch.read(buf);
+ if (err == null)
+ err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+ "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
- if (read == -1)
- throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+ err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
- buf.flip();
+ tryConnect(true);
+ }
+ else {
+ attempt++;
- buf = sslHnd.decode(buf);
- }
- else
- buf = handBuff;
- }
- else {
- buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
+ tryConnect(false); // Reconnection on handshake timeout.
+ }
+ }
+ else {
+ onException("Client creation failed [addr=" + currAddr + ", err=" + e + ']', e);
- for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ if (log.isDebugEnabled())
+ log.debug("Client creation failed [addr=" + currAddr + ", err=" + e + ']');
- if (read == -1)
- throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+ boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
- i += read;
- }
- }
+ if (failureDetThrReached)
+ LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
+ "configuration property) [addr=" + currAddr + ", failureDetectionTimeout=" +
+ failureDetectionTimeout() + ']');
+ else if (X.hasCause(e, SocketTimeoutException.class))
+ LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " +
+ "configuration property) [addr=" + currAddr + ", connTimeout=" + connTimeout + ']');
- UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
+ if (err == null)
+ err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+ "Make sure that each ComputeTask and GridCacheTransaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
- if (!rmtNodeId.equals(rmtNodeId0))
- throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
- ", rcvd=" + rmtNodeId0 + ']');
- else if (log.isDebugEnabled())
- log.debug("Received remote node ID: " + rmtNodeId0);
+ err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
- if (isSslEnabled()) {
- assert sslHnd != null;
+ // Reconnect for the second time, if connection is not established.
+ int connectAttempts0;
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
- }
- else
- ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+ if (!failureDetThrReached && (connectAttempts0 = connectAttempts) <= 2 &&
+ (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
+ connectAttempts = connectAttempts0 + 1;
- ClusterNode locNode = getLocalNode();
+ tryConnect(false);
- if (locNode == null)
- throw new IgniteCheckedException("Local node has not been started or " +
- "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+ return;
+ }
- if (recovery != null) {
- HandshakeMessage msg;
+ tryConnect(true);
+ }
- int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
+ onDone(e);
+ }
- if (handshakeConnIdx != null) {
- msg = new HandshakeMessage2(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received(),
- handshakeConnIdx);
+ /**
+ *
+ */
+ private Collection<InetSocketAddress> addrs() throws IgniteCheckedException {
+ Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
+ Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
+ Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
+ Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
- msgSize += 4;
- }
- else {
- msg = new HandshakeMessage(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received());
- }
+ boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
+ boolean isExtAddrsExist = !F.isEmpty(extAddrs);
- if (log.isDebugEnabled())
- log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+ if (!isRmtAddrsExist && !isExtAddrsExist)
+ throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
+ "TCP communication addresses or mapped external addresses. Check configuration and make sure " +
+ "that you use the same communication SPI on all nodes. Remote node id: " + node.id());
- buf = ByteBuffer.allocate(msgSize);
+ LinkedHashSet<InetSocketAddress> addrs;
- buf.order(ByteOrder.nativeOrder());
+ // Try to connect first on bound addresses.
+ if (isRmtAddrsExist) {
+ List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
- boolean written = msg.writeTo(buf, null);
+ boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
- assert written;
+ Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
- buf.flip();
+ addrs = new LinkedHashSet<>(addrs0);
+ }
+ else
+ addrs = new LinkedHashSet<>();
- if (isSslEnabled()) {
- assert sslHnd != null;
+ // Then on mapped external addresses.
+ if (isExtAddrsExist)
+ addrs.addAll(extAddrs);
- ch.write(sslHnd.encrypt(buf));
- }
- else
- ch.write(buf);
- }
- else {
- if (isSslEnabled()) {
- assert sslHnd != null;
+ Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
- }
- else
- ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
- }
+ for (InetSocketAddress addr : addrs)
+ allInetAddrs.add(addr.getAddress());
- if (recovery != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+ List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
+
+ if (reachableInetAddrs.size() < allInetAddrs.size()) {
+ LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
- if (isSslEnabled()) {
- assert sslHnd != null;
+ for (InetSocketAddress addr : addrs) {
+ if (reachableInetAddrs.contains(addr.getAddress()))
+ addrs0.add(addr);
+ }
+ for (InetSocketAddress addr : addrs) {
+ if (!reachableInetAddrs.contains(addr.getAddress()))
+ addrs0.add(addr);
+ }
- buf = ByteBuffer.allocate(1000);
+ addrs = addrs0;
+ }
- ByteBuffer decode = null;
+ if (log.isDebugEnabled())
+ log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
- buf.order(ByteOrder.nativeOrder());
+ return addrs;
+ }
- for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpClientFuture.class, this);
+ }
+ }
- if (read == -1)
- throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ /**
+ * Performs handshake in timeout-safe way.
+ *
+ * @param client Client.
+ * @param rmtNodeId Remote node.
+ * @param timeout Timeout for handshake.
+ * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+ * @return Handshake response.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ private <T> long safeHandshake(T client, UUID rmtNodeId, long timeout) throws IgniteCheckedException {
+ HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, null, U.currentTimeMillis() + timeout);
- buf.flip();
+ addTimeoutObject(obj);
- decode = sslHnd.decode(buf);
+ long rcvCnt = 0;
- i += decode.remaining();
+ try {
+ if (client instanceof GridCommunicationClient)
+ ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
+ else {
+ SocketChannel ch = (SocketChannel)client;
- buf.clear();
- }
+ boolean success = false;
- rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
+ try {
+ ByteBuffer buf;
- if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
- decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+ buf = ByteBuffer.allocate(17);
- sslMeta.decodedBuffer(decode);
- }
+ for (int i = 0; i < 17; ) {
+ int read = ch.read(buf);
- ByteBuffer inBuf = sslHnd.inputBuffer();
+ if (read == -1)
+ throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
- if (inBuf.position() > 0)
- sslMeta.encodedBuffer(inBuf);
- }
- else {
- buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+ i += read;
+ }
- buf.order(ByteOrder.nativeOrder());
+ UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
- for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ if (!rmtNodeId.equals(rmtNodeId0))
+ throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
+ ", rcvd=" + rmtNodeId0 + ']');
+ else if (log.isDebugEnabled())
+ log.debug("Received remote node ID: " + rmtNodeId0);
- if (read == -1)
- throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
- i += read;
- }
+ ClusterNode locNode = getLocalNode();
- rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
- }
+ if (locNode == null)
+ throw new IgniteCheckedException("Local node has not been started or " +
+ "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
- if (log.isDebugEnabled())
- log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+ ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
- if (rcvCnt == -1) {
- if (log.isDebugEnabled())
- log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
- }
- else
- success = true;
- }
- else
- success = true;
+ success = true;
}
catch (IOException e) {
if (log.isDebugEnabled())
@@ -3815,7 +4133,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
*/
private class CommunicationWorker extends IgniteSpiThread {
/** */
- private final BlockingQueue<DisconnectedSessionInfo> q = new LinkedBlockingQueue<>();
+ private final BlockingQueue<SessionInfo> q = new LinkedBlockingQueue<>();
/**
* @param igniteInstanceName Ignite instance name.
@@ -3830,10 +4148,56 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
log.debug("Tcp communication worker has been started.");
while (!isInterrupted()) {
- DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+ SessionInfo sesInfo = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+
+ if (sesInfo != null) {
+ ConnectContext ctx;
+
+ TcpClientFuture clientFut;
+
+ switch (sesInfo.state) {
+ case RECONNECT:
+ processDisconnect(sesInfo);
+
+ break;
+
+ case RETRY:
+ Runnable clo = sesInfo.clo;
+
+ assert clo != null;
- if (disconnectData != null)
- processDisconnect(disconnectData);
+ clo.run();
+
+ break;
+
+ case READY:
+
<TRUNCATED>