You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/12 09:39:58 UTC
[3/7] incubator-ignite git commit: # ignite-21
# ignite-21
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dae4b942
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dae4b942
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dae4b942
Branch: refs/heads/master
Commit: dae4b9428cfe04fb2d0f78020e7f0ea29eb37c04
Parents: 20872dc
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 10 10:41:56 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 10 13:22:50 2014 +0300
----------------------------------------------------------------------
.../rest/protocols/tcp/GridMockNioSession.java | 11 +
.../communication/tcp/TcpCommunicationSpi.java | 1151 +++++++++++++++---
.../tcp/TcpCommunicationSpiMBean.java | 52 +-
.../discovery/tcp/TcpDiscoverySpiAdapter.java | 3 +-
.../GridTcpCommunicationMessageAdapter.java | 7 +
.../GridTcpCommunicationMessageFactory.java | 10 +-
.../grid/util/nio/GridCommunicationClient.java | 3 +-
.../grid/util/nio/GridNioFinishedFuture.java | 5 +
.../gridgain/grid/util/nio/GridNioFuture.java | 15 +-
.../grid/util/nio/GridNioFutureImpl.java | 7 +-
.../util/nio/GridNioRecoveryDescriptor.java | 356 ++++++
.../gridgain/grid/util/nio/GridNioServer.java | 167 ++-
.../gridgain/grid/util/nio/GridNioSession.java | 10 +
.../grid/util/nio/GridNioSessionImpl.java | 10 +
.../util/nio/GridSelectorNioSessionImpl.java | 97 ++
.../util/nio/GridShmemCommunicationClient.java | 4 +-
.../util/nio/GridTcpCommunicationClient.java | 4 +-
.../util/nio/GridTcpNioCommunicationClient.java | 39 +-
.../GridAbstractCommunicationSelfTest.java | 85 +-
.../spi/communication/GridTestMessage.java | 15 +-
.../GridTcpCommunicationSpiAbstractTest.java | 26 +-
...mmunicationSpiConcurrentConnectSelfTest.java | 398 ++++++
.../GridTcpCommunicationSpiConfigSelfTest.java | 4 +
...cpCommunicationSpiMultithreadedSelfTest.java | 77 +-
...pCommunicationSpiMultithreadedShmemTest.java | 2 +-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 426 +++++++
...GridTcpCommunicationSpiRecoverySelfTest.java | 713 +++++++++++
.../GridTcpCommunicationSpiShmemSelfTest.java | 2 +-
...cpCommunicationSpiTcpNoDelayOffSelfTest.java | 20 +
.../managers/GridManagerStopSelfTest.java | 2 +
.../grid/spi/GridSpiStartStopAbstractTest.java | 2 +
.../nio/impl/GridNioFilterChainSelfTest.java | 10 +
.../gridgain/testframework/GridTestNode.java | 12 +-
.../GridSpiCommunicationSelfTestSuite.java | 8 +-
34 files changed, 3434 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java b/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java
index 7071ed5..bf82f22 100644
--- a/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java
+++ b/modules/clients/src/test/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMockNioSession.java
@@ -10,6 +10,7 @@ package org.gridgain.grid.kernal.processors.rest.protocols.tcp;
import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.nio.*;
+import org.jetbrains.annotations.*;
import java.net.*;
@@ -132,4 +133,14 @@ public class GridMockNioSession extends GridMetadataAwareAdapter implements Grid
@Override public boolean readsPaused() {
return false;
}
+
+ /** {@inheritDoc} */
+ @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index a80421e..1911b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -24,6 +24,7 @@ import org.gridgain.grid.kernal.managers.eventstorage.*;
import org.apache.ignite.spi.communication.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.future.*;
import org.gridgain.grid.util.ipc.*;
import org.gridgain.grid.util.ipc.shmem.*;
import org.gridgain.grid.util.lang.*;
@@ -51,7 +52,7 @@ import static org.apache.ignite.events.IgniteEventType.*;
* TCP/IP protocol and Java NIO to communicate with other nodes.
* <p>
* To enable communication with other nodes, this SPI adds {@link #ATTR_ADDRS}
- * and {@link #ATTR_PORT} local node attributes (see {@link org.apache.ignite.cluster.ClusterNode#attributes()}.
+ * and {@link #ATTR_PORT} local node attributes (see {@link ClusterNode#attributes()}.
* <p>
* At startup, this SPI tries to start listening to local port specified by
* {@link #setLocalPort(int)} method. If local port is occupied, then SPI will
@@ -84,9 +85,7 @@ import static org.apache.ignite.events.IgniteEventType.*;
* <li>Direct or heap buffer allocation for sending (see {@link #setDirectSendBuffer(boolean)})</li>
* <li>Count of selectors and selector threads for NIO server (see {@link #setSelectorsCount(int)})</li>
* <li>{@code TCP_NODELAY} socket option for sockets (see {@link #setTcpNoDelay(boolean)})</li>
- * <li>Async message sending (see {@link #setAsyncSend(boolean)})</li>
* <li>Message queue limit (see {@link #setMessageQueueLimit(int)})</li>
- * <li>Dual socket connection (see {@link #setDualSocketConnection(boolean)})</li>
* <li>Minimum buffered message count (see {@link #setMinimumBufferedMessageCount(int)})</li>
* <li>Buffer size ratio (see {@link #setBufferSizeRatio(double)})</li>
* <li>Connect timeout (see {@link #setConnectTimeout(long)})</li>
@@ -95,6 +94,9 @@ import static org.apache.ignite.events.IgniteEventType.*;
* <li>Local port to accept shared memory connections (see {@link #setSharedMemoryPort(int)})</li>
* <li>Socket receive buffer size (see {@link #setSocketReceiveBuffer(int)})</li>
* <li>Socket send buffer size (see {@link #setSocketSendBuffer(int)})</li>
+ * <li>Socket write timeout (see {@link #setSocketWriteTimeout(long)})</li>
+ * <li>Number of received messages after which acknowledgment is sent (see {@link #setAckSendThreshold(int)})</li>
+ * <li>Maximum number of unacknowledged messages (see {@link #setUnacknowledgedMessagesBufferSize(int)})</li>
* </ul>
* <h2 class="header">Java Example</h2>
* GridTcpCommunicationSpi is used by default and should be explicitly configured
@@ -131,7 +133,7 @@ import static org.apache.ignite.events.IgniteEventType.*;
* <img src="http://www.gridgain.com/images/spring-small.png">
* <br>
* For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
- * @see org.apache.ignite.spi.communication.CommunicationSpi
+ * @see CommunicationSpi
*/
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = false)
@@ -187,9 +189,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Default message queue limit per connection (for incoming and outgoing . */
public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT;
- /** Default value for dualSocketConnection flag. */
- public static final boolean DFLT_DUAL_SOCKET_CONNECTION = false;
-
/**
* Default count of selectors for TCP server equals to
* {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
@@ -211,6 +210,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */
public static final boolean DFLT_TCP_NODELAY = true;
+ /** Default received messages threshold for sending ack. */
+ public static final int DFLT_ACK_SND_THRESHOLD = 512;
+
+ /** Default socket write timeout. */
+ public static final long DFLT_SOCK_WRITE_TIMEOUT = GridNioServer.DFLT_SES_WRITE_TIMEOUT;
+
/** No-op runnable. */
private static final IgniteRunnable NOOP = new IgniteRunnable() {
@Override public void run() {
@@ -221,9 +226,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Node ID message type. */
public static final byte NODE_ID_MSG_TYPE = -1;
+ /** */
+ public static final byte RECOVERY_LAST_ID_MSG_TYPE = -2;
+
+ /** */
+ public static final byte HANDSHAKE_MSG_TYPE = -3;
+
/** Server listener. */
private final GridNioServerListener<GridTcpCommunicationMessageAdapter> srvLsnr =
new GridNioServerListenerAdapter<GridTcpCommunicationMessageAdapter>() {
+ @Override public void onSessionWriteTimeout(GridNioSession ses) {
+ LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " +
+ "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
+ ", writeTimeout=" + sockWriteTimeout + ']');
+
+ if (log.isDebugEnabled())
+ log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() +
+ ", writeTimeout=" + sockWriteTimeout + ']');
+
+ ses.close();
+ }
+
@Override public void onConnected(GridNioSession ses) {
if (ses.accepted()) {
if (log.isDebugEnabled())
@@ -231,8 +254,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ses.send(nodeIdMsg);
}
- else
- assert asyncSnd;
}
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
@@ -243,9 +264,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (rmv instanceof GridTcpNioCommunicationClient &&
((GridTcpNioCommunicationClient)rmv).session() == ses &&
- clients.remove(id, rmv))
+ clients.remove(id, rmv)) {
rmv.forceClose();
+ if (!stopping) {
+ GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
+
+ if (recoveryData != null) {
+ if (recoveryData.nodeAlive(getSpiContext().node(id))) {
+ if (!recoveryData.messagesFutures().isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Session was closed but there are unacknowledged messages, " +
+ "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
+
+ recoveryWorker.addReconnectRequest(recoveryData);
+ }
+ }
+ else
+ recoveryData.onNodeLeft();
+ }
+ }
+ }
+
CommunicationListener<GridTcpCommunicationMessageAdapter> lsnr0 = lsnr;
if (lsnr0 != null)
@@ -259,20 +299,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (sndId == null) {
assert ses.accepted();
- assert msg instanceof NodeIdMessage;
+ if (msg instanceof NodeIdMessage)
+ sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+ else {
+ assert msg instanceof HandshakeMessage : msg;
- sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+ sndId = ((HandshakeMessage)msg).nodeId();
+ }
if (log.isDebugEnabled())
log.debug("Remote node ID received: " + sndId);
- UUID old = ses.addMeta(NODE_ID_META, sndId);
+ final UUID old = ses.addMeta(NODE_ID_META, sndId);
assert old == null;
- IgniteProductVersion locVer = getSpiContext().localNode().version();
-
- ClusterNode rmtNode = getSpiContext().node(sndId);
+ final ClusterNode rmtNode = getSpiContext().node(sndId);
if (rmtNode == null) {
ses.close();
@@ -280,25 +322,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return;
}
+ ClusterNode locNode = getSpiContext().localNode();
+
+ IgniteProductVersion locVer = locNode.version();
+
IgniteProductVersion rmtVer = rmtNode.version();
if (!locVer.equals(rmtVer))
ses.addMeta(GridNioServer.DIFF_VER_NODE_ID_META_KEY, sndId);
- if (asyncSnd && ses.remoteAddress() != null && !dualSockConn) {
- Object sync = locks.tryLock(sndId);
+ if (ses.remoteAddress() == null)
+ return;
+
+ GridCommunicationClient oldClient = clients.get(sndId);
+
+ boolean hasShmemClient = false;
+
+ if (oldClient != null) {
+ if (oldClient instanceof GridTcpNioCommunicationClient) {
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection when already connected " +
+ "to this node, rejecting [locNode=" + locNode.id() +
+ ", rmtNode=" + sndId + ']');
+
+ ses.send(new RecoveryLastReceivedMessage(-1));
+
+ return;
+ }
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
+
+ hasShmemClient = true;
+ }
+ }
+
+ GridFutureAdapterEx<GridCommunicationClient> fut = new GridFutureAdapterEx<>();
+
+ GridFutureAdapterEx<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
+
+ assert msg instanceof HandshakeMessage : msg;
+
+ HandshakeMessage msg0 = (HandshakeMessage)msg;
+
+ final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
+
+ if (oldFut == null) {
+ oldClient = clients.get(sndId);
+
+ if (oldClient != null) {
+ if (oldClient instanceof GridTcpNioCommunicationClient) {
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection when already connected " +
+ "to this node, rejecting [locNode=" + locNode.id() +
+ ", rmtNode=" + sndId + ']');
+
+ ses.send(new RecoveryLastReceivedMessage(-1));
+
+ return;
+ }
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
+
+ hasShmemClient = true;
+ }
+ }
+
+ boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+ new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
- if (sync != null) {
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection from remote node " +
+ "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
+
+ if (reserved) {
try {
- if (clients.get(sndId) == null) {
- if (log.isDebugEnabled())
- log.debug("Will reuse session for node: " + sndId);
+ GridTcpNioCommunicationClient client =
+ connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
- clients.put(sndId, new GridTcpNioCommunicationClient(ses));
- }
+ fut.onDone(client);
}
finally {
- locks.unlock(sndId, sync);
+ clientFuts.remove(rmtNode.id(), fut);
+ }
+ }
+ }
+ else {
+ if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received incoming connection from remote node while " +
+ "connecting to this node, rejecting [locNode=" + locNode.id() +
+ ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
+ ", rmtNodeOrder=" + rmtNode.order() + ']');
+ }
+
+ ses.send(new RecoveryLastReceivedMessage(-1));
+ }
+ else {
+ boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+ new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+
+ if (reserved) {
+ GridTcpNioCommunicationClient client =
+ connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
+
+ fut.onDone(client);
}
}
}
@@ -306,6 +433,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else {
rcvdMsgsCnt.increment();
+ GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+
+ if (recovery != null) {
+ if (msg instanceof RecoveryLastReceivedMessage) {
+ RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
+
+ if (log.isDebugEnabled())
+ log.debug("Received recovery acknowledgement [rmtNode=" + sndId +
+ ", rcvCnt=" + msg0.received() + ']');
+
+ recovery.ackReceived(msg0.received());
+
+ return;
+ }
+ else {
+ long rcvCnt = recovery.onReceived();
+
+ if (rcvCnt % ackSndThreshold == 0) {
+ if (log.isDebugEnabled())
+ log.debug("Send recovery acknowledgement [rmtNode=" + sndId +
+ ", rcvCnt=" + rcvCnt + ']');
+
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt));
+
+ recovery.lastAcknowledged(rcvCnt);
+ }
+ }
+ }
+
IgniteRunnable c;
if (msgQueueLimit > 0) {
@@ -328,6 +484,135 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
notifyListener(sndId, msg, c);
}
}
+
+ /**
+ * @param recovery Recovery descriptor.
+ * @param ses Session.
+ * @param node Node.
+ * @param rcvCnt Number of received messages..
+ * @param sndRes If {@code true} sends response for recovery handshake.
+ * @param createClient If {@code true} creates NIO communication client.
+ * @return Client.
+ */
+ private GridTcpNioCommunicationClient connected(
+ GridNioRecoveryDescriptor recovery,
+ GridNioSession ses,
+ ClusterNode node,
+ long rcvCnt,
+ boolean sndRes,
+ boolean createClient) {
+ recovery.onHandshake(rcvCnt);
+
+ ses.recoveryDescriptor(recovery);
+
+ nioSrvr.resend(ses);
+
+ if (sndRes)
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount()));
+
+ recovery.connected();
+
+ GridTcpNioCommunicationClient client = null;
+
+ if (createClient) {
+ client = new GridTcpNioCommunicationClient(ses, log);
+
+ GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
+
+ assert oldClient == null;
+ }
+
+ return client;
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("PackageVisibleInnerClass")
+ class ConnectClosure implements IgniteInClosure<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final GridNioSession ses;
+
+ /** */
+ private final GridNioRecoveryDescriptor recoveryDesc;
+
+ /** */
+ private final ClusterNode rmtNode;
+
+ /** */
+ private final HandshakeMessage msg;
+
+ /** */
+ private final GridFutureAdapterEx<GridCommunicationClient> fut;
+
+ /** */
+ private final boolean createClient;
+
+ /**
+ * @param ses Incoming session.
+ * @param recoveryDesc Recovery descriptor.
+ * @param rmtNode Remote node.
+ * @param msg Handshake message.
+ * @param createClient If {@code true} creates NIO communication client..
+ * @param fut Connect future.
+ */
+ ConnectClosure(GridNioSession ses,
+ GridNioRecoveryDescriptor recoveryDesc,
+ ClusterNode rmtNode,
+ HandshakeMessage msg,
+ boolean createClient,
+ GridFutureAdapterEx<GridCommunicationClient> fut) {
+ this.ses = ses;
+ this.recoveryDesc = recoveryDesc;
+ this.rmtNode = rmtNode;
+ this.msg = msg;
+ this.createClient = createClient;
+ this.fut = fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(Boolean success) {
+ if (success) {
+ IgniteInClosure<GridNioFuture<?>> lsnr = new IgniteInClosure<GridNioFuture<?>>() {
+ @Override public void apply(GridNioFuture<?> msgFut) {
+ try {
+ msgFut.get();
+
+ GridTcpNioCommunicationClient client =
+ connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
+
+ fut.onDone(client);
+ }
+ catch (GridException | IOException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send recovery handshake " +
+ "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+
+ recoveryDesc.release();
+
+ fut.onDone();
+ }
+ finally {
+ clientFuts.remove(rmtNode.id(), fut);
+ }
+ }
+ };
+
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr);
+ }
+ else {
+ try {
+ fut.onDone();
+ }
+ finally {
+ clientFuts.remove(rmtNode.id(), fut);
+ }
+ }
+ }
+ }
};
/** Logger. */
@@ -402,9 +687,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Buffer size ratio. */
private double bufSizeRatio = IgniteSystemProperties.getDouble(GG_COMMUNICATION_BUF_RESIZE_RATIO, 0.8);
- /** Dual socket connection flag. */
- private boolean dualSockConn = DFLT_DUAL_SOCKET_CONNECTION;
-
/** NIO server. */
private GridNioServer<GridTcpCommunicationMessageAdapter> nioSrvr;
@@ -414,8 +696,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@code TCP_NODELAY} option value for created sockets. */
private boolean tcpNoDelay = DFLT_TCP_NODELAY;
- /** Use async client flag. */
- private boolean asyncSnd = true;
+ /** Number of received messages after which acknowledgment is sent. */
+ private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD;
+
+ /** Maximum number of unacknowledged messages. */
+ private int unackedMsgsBufSize;
+
+ /** Socket write timeout. */
+ private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
/** Shared memory accept worker. */
private ShmemAcceptWorker shmemAcceptWorker;
@@ -429,6 +717,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Socket timeout worker. */
private SocketTimeoutWorker sockTimeoutWorker;
+ /** Recovery worker. */
+ private RecoveryWorker recoveryWorker;
+
/** Shared memory workers. */
private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
@@ -468,6 +759,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Context initialization latch. */
private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
+ /** Stopping flag. */
+ private volatile boolean stopping;
+
/** metrics listener. */
private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() {
@Override public void onBytesSent(int bytesCnt) {
@@ -479,8 +773,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
};
- /** Locks. */
- private final GridKeyLock locks = new GridKeyLock();
+ /** Client connect futures. */
+ private final ConcurrentMap<UUID, GridFutureAdapterEx<GridCommunicationClient>> clientFuts =
+ GridConcurrentFactory.newMap();
+
+ /** */
+ private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -696,6 +994,58 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return idleConnTimeout;
}
+ /** {@inheritDoc} */
+ @Override public long getSocketWriteTimeout() {
+ return sockWriteTimeout;
+ }
+
+ /**
+ * Sets socket write timeout for TCP connection. If message can not be written to
+ * socket within this time then connection is closed and reconnect is attempted.
+ * <p>
+ * Default to {@link #DFLT_SOCK_WRITE_TIMEOUT}.
+ *
+ * @param sockWriteTimeout Socket write timeout for TCP connection.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setSocketWriteTimeout(long sockWriteTimeout) {
+ this.sockWriteTimeout = sockWriteTimeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getAckSendThreshold() {
+ return ackSndThreshold;
+ }
+
+ /**
+ * Sets number of received messages per connection to node after which acknowledgment message is sent.
+ * <p>
+ * Default to {@link #DFLT_ACK_SND_THRESHOLD}.
+ *
+ * @param ackSndThreshold Number of received messages after which acknowledgment is sent.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setAckSendThreshold(int ackSndThreshold) {
+ this.ackSndThreshold = ackSndThreshold;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getUnacknowledgedMessagesBufferSize() {
+ return unackedMsgsBufSize;
+ }
+
+ /**
+ * Sets maximum number of stored unacknowledged messages per connection to node.
+ * If number of unacknowledged messages exceeds this number then connection to node is
+ * closed and reconnect is attempted.
+ *
+ * @param unackedMsgsBufSize Maximum number of unacknowledged messages.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) {
+ this.unackedMsgsBufSize = unackedMsgsBufSize;
+ }
+
/**
* Sets connection buffer size. If set to {@code 0} connection buffer is disabled.
* <p>
@@ -838,24 +1188,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return selectorsCnt;
}
- /** {@inheritDoc} */
- @Override public boolean isAsyncSend() {
- return asyncSnd;
- }
-
- /**
- * Sets flag defining whether asynchronous (NIO) or synchronous (blocking) IO
- * should be used to send messages.
- * <p>
- * If not provided, default value is {@code true}.
- *
- * @param asyncSnd {@code True} if asynchronous IO should be used to send messages.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setAsyncSend(boolean asyncSnd) {
- this.asyncSnd = asyncSnd;
- }
-
/**
* Sets value for {@code TCP_NODELAY} socket option. Each
* socket will be opened using provided value.
@@ -915,33 +1247,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
- * Sets flag indicating whether dual-socket connection between nodes should be enforced. If set to
- * {@code true}, two separate connections will be established between communicating nodes: one for outgoing
- * messages, and one for incoming. When set to {@code false}, single {@code TCP} connection will be used
- * for both directions.
- * <p>
- * This flag is useful on some operating systems, when {@code TCP_NODELAY} flag is disabled and
- * messages take too long to get delivered.
- * <p>
- * If not provided, default is {@code false}.
- *
- * @param dualSockConn Whether dual-socket connection should be enforced.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setDualSocketConnection(boolean dualSockConn) {
- this.dualSockConn = dualSockConn;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isDualSocketConnection() {
- return dualSockConn;
- }
-
- /**
* Sets message queue limit for incoming and outgoing messages.
* <p>
- * This parameter only used when {@link #isAsyncSend()} set to {@code true}.
- * <p>
* When set to positive number send queue is limited to the configured value.
* {@code 0} disables the size limitations.
* <p>
@@ -963,7 +1270,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Sets the minimum number of messages for this SPI, that are buffered
* prior to sending.
* <p>
- * Defaults to either {@code 512} or {@link org.apache.ignite.IgniteSystemProperties#GG_MIN_BUFFERED_COMMUNICATION_MSG_CNT}
+ * Defaults to either {@code 512} or {@link IgniteSystemProperties#GG_MIN_BUFFERED_COMMUNICATION_MSG_CNT}
* system property (if specified).
*
* @param minBufferedMsgCnt Minimum buffered message count.
@@ -982,7 +1289,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Sets the buffer size ratio for this SPI. As messages are sent,
* the buffer size is adjusted using this ratio.
* <p>
- * Defaults to either {@code 0.8} or {@link org.apache.ignite.IgniteSystemProperties#GG_COMMUNICATION_BUF_RESIZE_RATIO}
+ * Defaults to either {@code 0.8} or {@link IgniteSystemProperties#GG_COMMUNICATION_BUF_RESIZE_RATIO}
* system property (if specified).
*
* @param bufSizeRatio Buffer size ratio.
@@ -1064,6 +1371,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(bufSizeRatio > 0 && bufSizeRatio < 1, "bufSizeRatio > 0 && bufSizeRatio < 1");
assertParameter(connTimeout >= 0, "connTimeout >= 0");
assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+ assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
+ assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
+ assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
+
+ if (unackedMsgsBufSize > 0) {
+ assertParameter(unackedMsgsBufSize >= msgQueueLimit * 5,
+ "Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");
+
+ assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5,
+ "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
+ }
try {
locHost = U.resolveLocalHost(locAddr);
@@ -1125,18 +1443,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("connBufSize", connBufSize));
log.debug(configInfo("connBufFlushFreq", connBufFlushFreq));
log.debug(configInfo("selectorsCnt", selectorsCnt));
- log.debug(configInfo("asyncSend", asyncSnd));
log.debug(configInfo("tcpNoDelay", tcpNoDelay));
log.debug(configInfo("sockSndBuf", sockSndBuf));
log.debug(configInfo("sockRcvBuf", sockRcvBuf));
log.debug(configInfo("shmemPort", shmemPort));
log.debug(configInfo("msgQueueLimit", msgQueueLimit));
- log.debug(configInfo("dualSockConn", dualSockConn));
log.debug(configInfo("minBufferedMsgCnt", minBufferedMsgCnt));
log.debug(configInfo("bufSizeRatio", bufSizeRatio));
log.debug(configInfo("connTimeout", connTimeout));
log.debug(configInfo("maxConnTimeout", maxConnTimeout));
log.debug(configInfo("reconCnt", reconCnt));
+ log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
+ log.debug(configInfo("ackSndThreshold", ackSndThreshold));
+ log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
}
if (connBufSize > 8192)
@@ -1162,6 +1481,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
idleClientWorker.start();
+ recoveryWorker = new RecoveryWorker();
+
+ recoveryWorker.start();
+
if (connBufSize > 0) {
clientFlushWorker = new ClientFlushWorker();
@@ -1242,6 +1565,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.directMode(true)
.metricsListener(metricsLsnr)
.messageWriter(msgWriter)
+ .writeTimeout(sockWriteTimeout)
.filters(new GridNioCodecFilter(new GridDirectParser(msgReader, this), log, true),
new GridConnectionBytesVerifyFilter(log))
.build();
@@ -1321,6 +1645,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
+ assert stopping;
+
unregisterMBean();
// Stop TCP server.
@@ -1333,10 +1659,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
U.interrupt(idleClientWorker);
U.interrupt(clientFlushWorker);
U.interrupt(sockTimeoutWorker);
+ U.interrupt(recoveryWorker);
U.join(idleClientWorker, log);
U.join(clientFlushWorker, log);
U.join(sockTimeoutWorker, log);
+ U.join(recoveryWorker, log);
U.cancel(shmemWorkers);
U.join(shmemWorkers, log);
@@ -1360,6 +1688,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override protected void onContextDestroyed0() {
+ stopping = true;
+
if (ctxInitLatch.getCount() > 0)
// Safety.
ctxInitLatch.countDown();
@@ -1428,20 +1758,33 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridCommunicationClient client = null;
try {
- client = reserveClient(node);
+ boolean retry;
+
+ do {
+ client = reserveClient(node);
- UUID nodeId = null;
+ UUID nodeId = null;
- if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
- nodeId = node.id();
+ if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
+ nodeId = node.id();
- client.sendMessage(nodeId, msg);
+ retry = client.sendMessage(nodeId, msg);
- client.release();
+ client.release();
- client = null;
+ client = null;
+
+ if (!retry)
+ sentMsgsCnt.increment();
+ else {
+ ClusterNode node0 = getSpiContext().node(node.id());
- sentMsgsCnt.increment();
+ if (node0 == null)
+ throw new GridException("Failed to send message to remote node " +
+ "(node has left the grid): " + node.id());
+ }
+ }
+ while (retry);
}
catch (GridException e) {
throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
@@ -1469,23 +1812,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridCommunicationClient client = clients.get(nodeId);
if (client == null) {
+ if (stopping)
+ throw new IgniteSpiException("Grid is stopping.");
+
// Do not allow concurrent connects.
- Object sync = locks.lock(nodeId);
+ GridFutureAdapterEx<GridCommunicationClient> fut = new ConnectFuture();
- try {
- client = clients.get(nodeId);
+ GridFutureAdapterEx<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(nodeId, fut);
+
+ if (oldFut == null) {
+ try {
+ GridCommunicationClient client0 = clients.get(nodeId);
+
+ if (client0 == null) {
+ client0 = createNioClient(node);
+
+ if (client0 != null) {
+ GridCommunicationClient old = clients.put(nodeId, client0);
+
+ assert old == null;
+ }
+ else
+ U.sleep(200);
+ }
- if (client == null) {
- GridCommunicationClient old = clients.put(nodeId, client = createNioClient(node));
+ fut.onDone(client0);
+ }
+ catch (Throwable e) {
+ fut.onDone(e);
- assert old == null;
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ finally {
+ clientFuts.remove(nodeId, fut);
}
}
- finally {
- locks.unlock(nodeId, sync);
- }
+ else
+ fut = oldFut;
- assert client != null;
+ client = fut.get();
+
+ if (client == null)
+ continue;
if (getSpiContext().node(nodeId) == null) {
if (clients.remove(nodeId, client))
@@ -1570,7 +1939,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- safeHandshake(client, node.id(), connTimeout0);
+ safeHandshake(client, null, node.id(), connTimeout0);
}
catch (HandshakeTimeoutException e) {
if (log.isDebugEnabled())
@@ -1661,59 +2030,74 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
while (!conn) { // Reconnection on handshake timeout.
try {
- if (asyncSnd) {
- SocketChannel ch = SocketChannel.open();
+ SocketChannel ch = SocketChannel.open();
- ch.configureBlocking(true);
+ ch.configureBlocking(true);
- ch.socket().setTcpNoDelay(tcpNoDelay);
- ch.socket().setKeepAlive(true);
+ ch.socket().setTcpNoDelay(tcpNoDelay);
+ ch.socket().setKeepAlive(true);
- if (sockRcvBuf > 0)
- ch.socket().setReceiveBufferSize(sockRcvBuf);
+ if (sockRcvBuf > 0)
+ ch.socket().setReceiveBufferSize(sockRcvBuf);
- if (sockSndBuf > 0)
- ch.socket().setSendBufferSize(sockSndBuf);
+ if (sockSndBuf > 0)
+ ch.socket().setSendBufferSize(sockSndBuf);
- ch.socket().connect(addr, (int)connTimeout);
+ GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node);
- safeHandshake(ch, node.id(), connTimeout0);
+ if (!recoveryDesc.reserve()) {
+ U.closeQuiet(ch);
- UUID diffVerNodeId = null;
+ return null;
+ }
- IgniteProductVersion locVer = getSpiContext().localNode().version();
- IgniteProductVersion rmtVer = node.version();
+ long rcvCnt = -1;
- if (!locVer.equals(rmtVer))
- diffVerNodeId = node.id();
+ try {
+ ch.socket().connect(addr, (int)connTimeout);
- GridNioSession ses = nioSrvr.createSession(
- ch,
- F.asMap(
- NODE_ID_META, node.id(),
- GridNioServer.DIFF_VER_NODE_ID_META_KEY, diffVerNodeId)
- ).get();
+ rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
- client = new GridTcpNioCommunicationClient(ses);
+ if (rcvCnt == -1)
+ return null;
}
- else {
- client = new GridTcpCommunicationClient(
- metricsLsnr,
- msgWriter,
- addr,
- locHost,
- connTimeout,
- tcpNoDelay,
- sockRcvBuf,
- sockSndBuf,
- connBufSize,
- minBufferedMsgCnt,
- bufSizeRatio);
-
- safeHandshake(client, node.id(), connTimeout0);
+ finally {
+ if (recoveryDesc != null && rcvCnt == -1)
+ recoveryDesc.release();
}
- conn = true;
+ UUID diffVerNodeId = null;
+
+ IgniteProductVersion locVer = getSpiContext().localNode().version();
+ IgniteProductVersion rmtVer = node.version();
+
+ if (!locVer.equals(rmtVer))
+ diffVerNodeId = node.id();
+
+ try {
+ Map<Integer, Object> meta = new HashMap<>();
+
+ meta.put(NODE_ID_META, node.id());
+ meta.put(GridNioServer.DIFF_VER_NODE_ID_META_KEY, diffVerNodeId);
+
+ if (recoveryDesc != null) {
+ recoveryDesc.onHandshake(rcvCnt);
+
+ meta.put(-1, recoveryDesc);
+ }
+
+ GridNioSession ses = nioSrvr.createSession(ch, meta).get();
+
+ client = new GridTcpNioCommunicationClient(ses, log);
+
+ conn = true;
+ }
+ finally {
+ if (!conn) {
+ if (recoveryDesc != null)
+ recoveryDesc.release();
+ }
+ }
}
catch (HandshakeTimeoutException e) {
if (client != null) {
@@ -1812,16 +2196,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Performs handshake in timeout-safe way.
*
* @param client Client.
+ * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
* @throws GridException If handshake failed or wasn't completed withing timeout.
+ * @return Handshake response.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
- private <T> void safeHandshake(T client, UUID rmtNodeId, long timeout) throws GridException {
+ private <T> long safeHandshake(
+ T client,
+ @Nullable GridNioRecoveryDescriptor recovery,
+ UUID rmtNodeId,
+ long timeout
+ ) throws GridException {
HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
sockTimeoutWorker.addTimeoutObject(obj);
+ long rcvCnt = 0;
+
try {
if (client instanceof GridCommunicationClient)
((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
@@ -1851,9 +2244,62 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug("Received remote node ID: " + rmtNodeId0);
ch.write(ByteBuffer.wrap(U.GG_HEADER));
- ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
- success = true;
+ if (recovery != null) {
+ HandshakeMessage msg = new HandshakeMessage(locNodeId,
+ recovery.incrementConnectCount(),
+ recovery.receivedCount());
+
+ if (log.isDebugEnabled())
+ log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+
+ buf = ByteBuffer.allocate(33);
+
+ buf.order(ByteOrder.nativeOrder());
+
+ boolean written = msg.writeTo(buf);
+
+ assert written;
+
+ buf.flip();
+
+ ch.write(buf);
+ }
+ else
+ ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
+
+ if (recovery != null) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+
+ buf = ByteBuffer.allocate(9);
+
+ buf.order(ByteOrder.nativeOrder());
+
+ for (int i = 0; i < 9; ) {
+ int read = ch.read(buf);
+
+ if (read == -1)
+ throw new GridException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
+
+ i += read;
+ }
+
+ rcvCnt = buf.getLong(1);
+
+ if (log.isDebugEnabled())
+ log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+
+ if (rcvCnt == -1) {
+ if (log.isDebugEnabled())
+ log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
+ }
+ else
+ success = true;
+ }
+ else
+ success = true;
}
catch (IOException e) {
if (log.isDebugEnabled())
@@ -1878,6 +2324,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
throw new HandshakeTimeoutException("Failed to perform handshake due to timeout (consider increasing " +
"'connectionTimeout' configuration property).");
}
+
+ return rcvCnt;
}
/**
@@ -1896,11 +2344,83 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"is node stopping?) [senderNodeId=" + sndId + ", msg=" + msg + ']');
}
+ /**
+ * @param node Node.
+ * @return Recovery receive data for given node.
+ */
+ private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) {
+ ClientKey id = new ClientKey(node.id(), node.order());
+
+ GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
+
+ if (recovery == null) {
+ int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
+
+ int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
+
+ GridNioRecoveryDescriptor old =
+ recoveryDescs.put(id, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log));
+
+ if (old != null)
+ recovery = old;
+ }
+
+ return recovery;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpCommunicationSpi.class, this);
}
+ /**
+ *
+ */
+ private static class ClientKey {
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private long order;
+
+ /**
+ * @param nodeId Node ID.
+ * @param order Node order.
+ */
+ private ClientKey(UUID nodeId, long order) {
+ this.nodeId = nodeId;
+ this.order = order;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ ClientKey other = (ClientKey)obj;
+
+ return order == other.order && nodeId.equals(other.nodeId);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+
+ res = 31 * res + (int)(order ^ (order >>> 32));
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ClientKey.class, this);
+ }
+ }
+
/** Internal exception class for proper timeout handling. */
private static class HandshakeTimeoutException extends GridException {
/** */
@@ -2032,12 +2552,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@SuppressWarnings({"BusyWait"})
@Override protected void body() throws InterruptedException {
while (!isInterrupted()) {
+ cleanupRecovery();
+
for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
UUID nodeId = e.getKey();
GridCommunicationClient client = e.getValue();
- if (getSpiContext().node(nodeId) == null) {
+ ClusterNode node = getSpiContext().node(nodeId);
+
+ if (node == null) {
if (log.isDebugEnabled())
log.debug("Forcing close of non-existent node connection: " + nodeId);
@@ -2048,9 +2572,39 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
continue;
}
+ GridNioRecoveryDescriptor recovery = null;
+
+ if (client instanceof GridTcpNioCommunicationClient) {
+ recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+
+ if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+ RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+
+ if (log.isDebugEnabled())
+ log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
+ ", rcvCnt=" + msg.received() + ']');
+
+ nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+
+ recovery.lastAcknowledged(msg.received());
+
+ continue;
+ }
+ }
+
long idleTime = client.getIdleTime();
if (idleTime >= idleConnTimeout) {
+ if (recovery != null &&
+ recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+ !recovery.messagesFutures().isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Node connection is idle, but there are unacknowledged messages, " +
+ "will wait: " + nodeId);
+
+ continue;
+ }
+
if (log.isDebugEnabled())
log.debug("Closing idle node connection: " + nodeId);
@@ -2062,6 +2616,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
Thread.sleep(idleConnTimeout);
}
}
+
+ /**
+ *
+ */
+ private void cleanupRecovery() {
+ Set<ClientKey> left = null;
+
+ for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
+ if (left != null && left.contains(e.getKey()))
+ continue;
+
+ GridNioRecoveryDescriptor recoverySnd = e.getValue();
+
+ if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
+ if (left == null)
+ left = new HashSet<>();
+
+ left.add(e.getKey());
+ }
+ }
+
+ if (left != null) {
+ assert !left.isEmpty();
+
+ for (ClientKey id : left) {
+ GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
+
+ if (recoverySnd != null)
+ recoverySnd.onNodeLeft();
+ }
+ }
+ }
}
/**
@@ -2213,6 +2799,84 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
*
*/
+ private class RecoveryWorker extends IgniteSpiThread {
+ /** */
+ private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
+
+ /**
+ *
+ */
+ private RecoveryWorker() {
+ super(gridName, "tcp-comm-recovery-worker", log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ if (log.isDebugEnabled())
+ log.debug("Recovery worker has been started.");
+
+ while (!isInterrupted()) {
+ GridNioRecoveryDescriptor recoveryDesc = q.take();
+
+ assert recoveryDesc != null;
+
+ ClusterNode node = recoveryDesc.node();
+
+ if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+ continue;
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+
+ GridCommunicationClient client = reserveClient(node);
+
+ client.release();
+ }
+ catch (GridException e) {
+ if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
+ if (log.isDebugEnabled())
+ log.debug("Recovery reconnect failed, will retry " +
+ "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+ addReconnectRequest(recoveryDesc);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Recovery reconnect failed, " +
+ "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+ }
+ }
+ }
+
+ /**
+ * @param recoverySnd Recovery send data.
+ */
+ void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) {
+ boolean add = q.add(recoverySnd);
+
+ assert add;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ConnectFuture extends GridFutureAdapterEx<GridCommunicationClient> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public ConnectFuture() {
+ // No-op.
+ }
+ }
+
+ /**
+ *
+ */
private static class HandshakeTimeoutObject<T> {
/** */
private static final AtomicLong idGen = new AtomicLong();
@@ -2355,6 +3019,206 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ * Handshake message.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ public static class HandshakeMessage extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private long rcvCnt;
+
+ /** */
+ private long connectCnt;
+
+ /**
+ * Default constructor required by {@link GridTcpCommunicationMessageAdapter}.
+ */
+ public HandshakeMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param connectCnt Connect count.
+ * @param rcvCnt Number of received messages.
+ */
+ public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
+ assert nodeId != null;
+ assert rcvCnt >= 0 : rcvCnt;
+
+ this.nodeId = nodeId;
+ this.connectCnt = connectCnt;
+ this.rcvCnt = rcvCnt;
+ }
+
+ /**
+ * @return Connect count.
+ */
+ public long connectCount() {
+ return connectCnt;
+ }
+
+ /**
+ * @return Number of received messages.
+ */
+ public long received() {
+ return rcvCnt;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ if (buf.remaining() < 33)
+ return false;
+
+ buf.put(HANDSHAKE_MSG_TYPE);
+
+ byte[] bytes = U.uuidToBytes(nodeId);
+
+ assert bytes.length == 16 : bytes.length;
+
+ buf.put(bytes);
+
+ buf.putLong(rcvCnt);
+
+ buf.putLong(connectCnt);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
+ if (buf.remaining() < 32)
+ return false;
+
+ byte[] nodeIdBytes = new byte[16];
+
+ buf.get(nodeIdBytes);
+
+ nodeId = U.bytesToUuid(nodeIdBytes, 0);
+
+ rcvCnt = buf.getLong();
+
+ connectCnt = buf.getLong();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return HANDSHAKE_MSG_TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("CloneDoesntCallSuperClone")
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter msg) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HandshakeMessage.class, this);
+ }
+ }
+
+ /**
+ * Recovery acknowledgment message.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ public static class RecoveryLastReceivedMessage extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long rcvCnt;
+
+ /**
+ * Default constructor required by {@link GridTcpCommunicationMessageAdapter}.
+ */
+ public RecoveryLastReceivedMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param rcvCnt Number of received messages.
+ */
+ public RecoveryLastReceivedMessage(long rcvCnt) {
+ this.rcvCnt = rcvCnt;
+ }
+
+ /**
+ * @return Number of received messages.
+ */
+ public long received() {
+ return rcvCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ if (buf.remaining() < 9)
+ return false;
+
+ buf.put(RECOVERY_LAST_ID_MSG_TYPE);
+
+ buf.putLong(rcvCnt);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
+ if (buf.remaining() < 8)
+ return false;
+
+ rcvCnt = buf.getLong();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return RECOVERY_LAST_ID_MSG_TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter msg) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipRecovery() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RecoveryLastReceivedMessage.class, this);
+ }
+ }
+
+ /**
* Node ID message.
*/
@SuppressWarnings("PublicInnerClass")
@@ -2426,5 +3290,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(NodeIdMessage.class, this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index b35e735..1255925 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -70,15 +70,6 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
public int getSelectorsCount();
/**
- * Gets flag defining whether asynchronous (NIO) or synchronous (blocking) IO
- * should be used to send messages.
- *
- * @return {@code True} if asynchronous IO should be used to send messages.
- */
- @IgniteMBeanDescription("Asynchronous send.")
- public boolean isAsyncSend();
-
- /**
* Gets sent messages count.
*
* @return Sent messages count.
@@ -220,23 +211,7 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
public int getSocketSendBuffer();
/**
- * Gets flag indicating whether dual-socket connection between nodes should be enforced. If set to
- * {@code true}, two separate connections will be established between communicating nodes: one for outgoing
- * messages, and one for incoming. When set to {@code false}, single {@code TCP} connection will be used
- * for both directions.
- * <p>
- * This flag is useful on some operating systems, when {@code TCP_NODELAY} flag is disabled and
- * messages take too long to get delivered.
- *
- * @return Whether dual-socket connection should be enforced.
- */
- @IgniteMBeanDescription("Dual-socket connection.")
- public boolean isDualSocketConnection();
-
- /**
* Gets message queue limit for incoming and outgoing messages.
- * <p>
- * This parameter only used when {@link #isAsyncSend()} set to {@code true}.
*
* @return Send queue size limit.
*/
@@ -260,4 +235,31 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
*/
@IgniteMBeanDescription("Buffer size ratio.")
public double getBufferSizeRatio();
+
+ /**
+ * Gets socket write timeout for TCP connections. If message can not be written to
+ * socket within this time then connection is closed and reconnect is attempted.
+ *
+ * @return Socket write timeout for TCP connections.
+ */
+ @IgniteMBeanDescription("Socket write timeout.")
+ public long getSocketWriteTimeout();
+
+ /**
+ * Gets number of received messages per connection to node after which acknowledgment message is sent.
+ *
+ * @return Number of received messages after which acknowledgment is sent.
+ */
+ @IgniteMBeanDescription("Number of received messages after which acknowledgment is sent.")
+ public int getAckSendThreshold();
+
+ /**
+ * Gets maximum number of stored unacknowledged messages per connection to node.
+ * If number of unacknowledged messages exceeds this number then connection to node is
+ * closed and reconnect is attempted.
+ *
+ * @return Maximum number of unacknowledged messages.
+ */
+ @IgniteMBeanDescription("Maximum number of unacknowledged messages.")
+ public int getUnacknowledgedMessagesBufferSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 5377e18..ffa7b3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -310,7 +310,8 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
@Override protected void onContextDestroyed0() {
super.onContextDestroyed0();
- ipFinder.onSpiContextDestroyed();
+ if (ipFinder != null)
+ ipFinder.onSpiContextDestroyed();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
index 8c479ff..af63a2f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
@@ -168,6 +168,13 @@ public abstract class GridTcpCommunicationMessageAdapter implements Serializable
protected abstract void clone0(GridTcpCommunicationMessageAdapter _msg);
/**
+ * @return {@code True} if should skip recovery for this message.
+ */
+ public boolean skipRecovery() {
+ return false;
+ }
+
+ /**
* @param arr Array.
* @return Array iterator.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
index 460c827..7c92065 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
@@ -282,8 +282,14 @@ public class GridTcpCommunicationMessageFactory {
* @return New message.
*/
public static GridTcpCommunicationMessageAdapter create(byte type) {
- return type == TcpCommunicationSpi.NODE_ID_MSG_TYPE ? new TcpCommunicationSpi.NodeIdMessage() :
- create0(type);
+ if (type == TcpCommunicationSpi.NODE_ID_MSG_TYPE)
+ return new TcpCommunicationSpi.NodeIdMessage();
+ else if (type == TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE)
+ return new TcpCommunicationSpi.RecoveryLastReceivedMessage();
+ else if (type == TcpCommunicationSpi.HANDSHAKE_MSG_TYPE)
+ return new TcpCommunicationSpi.HandshakeMessage();
+ else
+ return create0(type);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
index 4128475..5b0db53 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
@@ -87,8 +87,9 @@ public interface GridCommunicationClient {
* @param nodeId Node ID (provided only if versions of local and remote nodes are different).
* @param msg Message to send.
* @throws GridException If failed.
+ * @return {@code True} if should try to resend message.
*/
- void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException;
+ boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException;
/**
* @param timeout Timeout.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java
index 980b548..d6fcb60 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java
@@ -99,4 +99,9 @@ public class GridNioFinishedFuture<R> implements GridNioFuture<R> {
@Override public boolean messageThread() {
return msgThread;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean skipRecovery() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
index 2775c55..a3ab1ef 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
@@ -28,7 +28,7 @@ public interface GridNioFuture<R> {
*
* @return Operation result.
* @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted.
- * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
+ * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
* @throws GridException If operation failed.
* @throws IOException If IOException occurred while performing operation.
*/
@@ -42,8 +42,8 @@ public interface GridNioFuture<R> {
* @param timeout The maximum time to wait in milliseconds.
* @return Operation result.
* @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted.
- * @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out.
- * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
+ * @throws IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out.
+ * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
* @throws GridException If operation failed.
* @throws IOException If IOException occurred while performing operation.
*/
@@ -57,8 +57,8 @@ public interface GridNioFuture<R> {
* @param unit The time unit of the {@code timeout} argument.
* @return Operation result.
* @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted.
- * @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out.
- * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
+ * @throws IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out.
+ * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
* @throws GridException If operation failed.
* @throws IOException If IOException occurred while performing operation.
*/
@@ -104,4 +104,9 @@ public interface GridNioFuture<R> {
* @return {@code True} if future was created in thread that was processing message.
*/
public boolean messageThread();
+
+ /**
+ * @return {@code True} if skip recovery for this operation.
+ */
+ public boolean skipRecovery();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
index 3d5c4e9..7ab2e14 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
@@ -118,7 +118,7 @@ public class GridNioFutureImpl<R> extends AbstractQueuedSynchronizer implements
* @param nanosTimeout Timeout (nanoseconds).
* @return Result.
* @throws InterruptedException If interrupted.
- * @throws org.apache.ignite.lang.IgniteFutureTimeoutException If timeout reached before computation completed.
+ * @throws IgniteFutureTimeoutException If timeout reached before computation completed.
* @throws GridException If error occurred.
*/
@Nullable protected R get0(long nanosTimeout) throws InterruptedException, GridException {
@@ -307,6 +307,11 @@ public class GridNioFutureImpl<R> extends AbstractQueuedSynchronizer implements
}
/** {@inheritDoc} */
+ @Override public boolean skipRecovery() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNioFutureImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java
new file mode 100644
index 0000000..846a388
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioRecoveryDescriptor.java
@@ -0,0 +1,356 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.util.nio;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Recovery information for single node.
+ */
+public class GridNioRecoveryDescriptor {
+ /** Number of acknowledged messages. */
+ private long acked;
+
+ /** Unacknowledged message futures. */
+ private final ArrayDeque<GridNioFuture<?>> msgFuts;
+
+ /** Number of messages to resend. */
+ private int resendCnt;
+
+ /** Number of received messages. */
+ private long rcvCnt;
+
+ /** Reserved flag. */
+ private boolean reserved;
+
+ /** Last acknowledged message. */
+ private long lastAck;
+
+ /** Node left flag. */
+ private boolean nodeLeft;
+
+ /** Target node. */
+ private final ClusterNode node;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Incoming connection request from remote node. */
+ private IgniteBiTuple<Long, IgniteInClosure<Boolean>> handshakeReq;
+
+ /** Connected flag. */
+ private boolean connected;
+
+ /** Number of outgoing connect attempts. */
+ private long connectCnt;
+
+ /** Maximum size of unacknowledged messages queue. */
+ private final int queueLimit;
+
+ /**
+ * @param queueLimit Maximum size of unacknowledged messages queue.
+ * @param node Node.
+ * @param log Logger.
+ */
+ public GridNioRecoveryDescriptor(int queueLimit, ClusterNode node, IgniteLogger log) {
+ assert !node.isLocal() : node;
+ assert queueLimit > 0;
+
+ msgFuts = new ArrayDeque<>(queueLimit);
+
+ this.queueLimit = queueLimit;
+ this.node = node;
+ this.log = log;
+ }
+
+ /**
+ * @return Connect count.
+ */
+ public long incrementConnectCount() {
+ return connectCnt++;
+ }
+
+ /**
+ * @return Node.
+ */
+ public ClusterNode node() {
+ return node;
+ }
+
+ /**
+ * Increments received messages counter.
+ *
+ * @return Number of received messages.
+ */
+ public long onReceived() {
+ rcvCnt++;
+
+ return rcvCnt;
+ }
+
+ /**
+ * @return Number of received messages.
+ */
+ public long received() {
+ return rcvCnt;
+ }
+
+ /**
+ * @param lastAck Last acknowledged message.
+ */
+ public void lastAcknowledged(long lastAck) {
+ this.lastAck = lastAck;
+ }
+
+ /**
+ * @return Last acknowledged message.
+ */
+ public long lastAcknowledged() {
+ return lastAck;
+ }
+
+ /**
+ * @return Received messages count.
+ */
+ public long receivedCount() {
+ return rcvCnt;
+ }
+
+ /**
+ * @return Maximum size of unacknowledged messages queue.
+ */
+ public int queueLimit() {
+ return queueLimit;
+ }
+
+ /**
+ * @param fut NIO future.
+ * @return {@code False} if queue limit is exceeded.
+ */
+ public boolean add(GridNioFuture<?> fut) {
+ assert fut != null;
+
+ if (!fut.skipRecovery()) {
+ if (resendCnt == 0) {
+ msgFuts.addLast(fut);
+
+ return msgFuts.size() < queueLimit;
+ }
+ else
+ resendCnt--;
+ }
+
+ return true;
+ }
+
+ /**
+ * @param rcvCnt Number of messages received by remote node.
+ */
+ public void ackReceived(long rcvCnt) {
+ if (log.isDebugEnabled())
+ log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt +
+ ", msgFuts=" + msgFuts.size() + ']');
+
+ while (acked < rcvCnt) {
+ GridNioFuture<?> fut = msgFuts.pollFirst();
+
+ assert fut != null;
+
+ ((GridNioFutureImpl)fut).onDone();
+
+ acked++;
+ }
+ }
+
+ /**
+ * Node left callback.
+ */
+ public void onNodeLeft() {
+ GridNioFuture<?>[] futs = null;
+
+ synchronized (this) {
+ nodeLeft = true;
+
+ if (!reserved && !msgFuts.isEmpty()) {
+ futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+
+ msgFuts.clear();
+ }
+ }
+
+ if (futs != null)
+ completeOnNodeLeft(futs);
+ }
+
+ /**
+ * @return Message futures for unacknowledged messages.
+ */
+ public Deque<GridNioFuture<?>> messagesFutures() {
+ return msgFuts;
+ }
+
+ /**
+ * @param node Node.
+ * @return {@code True} if node is not null and has the same order as initial remtoe node.
+ */
+ public boolean nodeAlive(@Nullable ClusterNode node) {
+ return node != null && node.order() == this.node.order();
+ }
+
+ /**
+ * @throws InterruptedException If interrupted.
+ * @return {@code True} if reserved.
+ */
+ public boolean reserve() throws InterruptedException {
+ synchronized (this) {
+ while (!connected && reserved)
+ wait();
+
+ if (!connected)
+ reserved = true;
+
+ return !connected;
+ }
+ }
+
+ /**
+ * @param rcvCnt Number of messages received by remote node.
+ */
+ public void onHandshake(long rcvCnt) {
+ ackReceived(rcvCnt);
+
+ resendCnt = msgFuts.size();
+ }
+
+ /**
+ *
+ */
+ public void connected() {
+ synchronized (this) {
+ assert reserved;
+ assert !connected;
+
+ connected = true;
+
+ if (handshakeReq != null) {
+ IgniteInClosure<Boolean> c = handshakeReq.get2();
+
+ assert c != null;
+
+ c.apply(false);
+
+ handshakeReq = null;
+ }
+
+ notifyAll();
+ }
+ }
+
+ /**
+ *
+ */
+ public void release() {
+ GridNioFuture<?>[] futs = null;
+
+ synchronized (this) {
+ connected = false;
+
+ if (handshakeReq != null) {
+ IgniteInClosure<Boolean> c = handshakeReq.get2();
+
+ assert c != null;
+
+ handshakeReq = null;
+
+ c.apply(true);
+ }
+ else {
+ reserved = false;
+
+ notifyAll();
+ }
+
+ if (nodeLeft && !msgFuts.isEmpty()) {
+ futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+
+ msgFuts.clear();
+ }
+ }
+
+ if (futs != null)
+ completeOnNodeLeft(futs);
+ }
+
+ /**
+ * @param id Handshake ID.
+ * @param c Closure to run on reserve.
+ * @return {@code True} if reserved.
+ */
+ public boolean tryReserve(long id, IgniteInClosure<Boolean> c) {
+ synchronized (this) {
+ if (connected) {
+ c.apply(false);
+
+ return false;
+ }
+
+ if (reserved) {
+ if (handshakeReq != null) {
+ assert handshakeReq.get1() != null;
+
+ long id0 = handshakeReq.get1();
+
+ assert id0 != id : id0;
+
+ if (id > id0) {
+ IgniteInClosure<Boolean> c0 = handshakeReq.get2();
+
+ assert c0 != null;
+
+ c0.apply(false);
+
+ handshakeReq = new IgniteBiTuple<>(id, c);
+ }
+ else
+ c.apply(false);
+ }
+ else
+ handshakeReq = new IgniteBiTuple<>(id, c);
+
+ return false;
+ }
+ else {
+ reserved = true;
+
+ return true;
+ }
+ }
+ }
+
+ /**
+ * @param futs Futures to complete.
+ */
+ private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
+ for (GridNioFuture<?> msg : futs)
+ ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNioRecoveryDescriptor.class, this);
+ }
+}