You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/15 07:17:44 UTC
ignite git commit: ignite-3220 Implemented separate in/out
connections in communication.
Repository: ignite
Updated Branches:
refs/heads/ignite-comm-opts2 [created] c70fe0793
ignite-3220 Implemented separate in/out connections in communication.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c70fe079
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c70fe079
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c70fe079
Branch: refs/heads/ignite-comm-opts2
Commit: c70fe0793b2b5a12ceef8fcaed11e2ba2ce76fa3
Parents: f30b79c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 15 10:17:36 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 15 10:17:36 2016 +0300
----------------------------------------------------------------------
.../rest/protocols/tcp/MockNioSession.java | 14 +-
.../util/nio/GridNioRecoveryDescriptor.java | 1 +
.../ignite/internal/util/nio/GridNioServer.java | 44 ++-
.../internal/util/nio/GridNioSession.java | 14 +-
.../internal/util/nio/GridNioSessionImpl.java | 16 +-
.../util/nio/GridSelectorNioSessionImpl.java | 46 ++-
.../communication/tcp/TcpCommunicationSpi.java | 325 +++++++++++++++----
.../nio/impl/GridNioFilterChainSelfTest.java | 14 +-
...mmunicationSpiConcurrentConnectSelfTest.java | 4 +-
...cpCommunicationSpiMultithreadedSelfTest.java | 2 +-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 2 +-
...CommunicationRecoveryAckClosureSelfTest.java | 2 +-
12 files changed, 379 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
index c82c73e..e848653 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
@@ -131,12 +131,22 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS
}
/** {@inheritDoc} */
- @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
// No-op.
}
/** {@inheritDoc} */
- @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+ @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
return null;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 35480ac..29903d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -31,6 +31,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Recovery information for single node.
*/
+@Deprecated // To be splitted into separate classes for in/out data when do not need maintain backward compatibility.
public class GridNioRecoveryDescriptor {
/** Number of acknowledged messages. */
private long acked;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 24b8fad..ccd0ae4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -37,6 +37,7 @@ import java.nio.channels.WritableByteChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
@@ -504,7 +505,7 @@ public class GridNioServer<T> {
public void resend(GridNioSession ses) {
assert ses instanceof GridSelectorNioSessionImpl;
- GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+ GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) {
Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures();
@@ -530,6 +531,13 @@ public class GridNioServer<T> {
}
/**
+ * @return Sessions.
+ */
+ public Collection<? extends GridNioSession> sessions() {
+ return sessions;
+ }
+
+ /**
* @param ses Session.
* @param op Operation.
* @return Future for operation.
@@ -1463,16 +1471,25 @@ public class GridNioServer<T> {
.append("rmtAddr=").append(ses.remoteAddress())
.append(", locAddr=").append(ses.localAddress());
- GridNioRecoveryDescriptor desc = ses.recoveryDescriptor();
+ GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
- if (desc != null) {
- sb.append(", msgsSent=").append(desc.sent())
- .append(", msgsAckedByRmt=").append(desc.acked())
- .append(", msgsRcvd=").append(desc.received())
- .append(", descIdHash=").append(System.identityHashCode(desc));
+ if (outDesc != null) {
+ sb.append(", msgsSent=").append(outDesc.sent())
+ .append(", msgsAckedByRmt=").append(outDesc.acked())
+ .append(", descIdHash=").append(System.identityHashCode(outDesc));
}
else
- sb.append(", recoveryDesc=null");
+ sb.append(", outRecoveryDesc=null");
+
+ GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+
+ if (inDesc != null) {
+ sb.append(", msgsRcvd=").append(inDesc.received())
+ .append(", lastAcked=").append(inDesc.lastAcknowledged())
+ .append(", descIdHash=").append(System.identityHashCode(inDesc));
+ }
+ else
+ sb.append(", inRecoveryDesc=null");
sb.append(", bytesRcvd=").append(ses.bytesReceived())
.append(", bytesSent=").append(ses.bytesSent())
@@ -1826,9 +1843,10 @@ public class GridNioServer<T> {
// Since ses is in closed state, no write requests will be added.
NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal());
- GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+ GridNioRecoveryDescriptor outRecovery = ses.outRecoveryDescriptor();
+ GridNioRecoveryDescriptor inRecovery = ses.inRecoveryDescriptor();
- if (recovery != null) {
+ if (outRecovery != null || inRecovery != null) {
try {
// Poll will update recovery data.
while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) {
@@ -1837,7 +1855,11 @@ public class GridNioServer<T> {
}
}
finally {
- recovery.release();
+ if (outRecovery != null)
+ outRecovery.release();
+
+ if (inRecovery != null && inRecovery != outRecovery)
+ inRecovery.release();
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index e4a7225..1e427d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -158,10 +158,20 @@ public interface GridNioSession {
/**
* @param recoveryDesc Recovery descriptor.
*/
- public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+ public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+
+ /**
+ * @param recoveryDesc Recovery descriptor.
+ */
+ public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+
+ /**
+ * @return Recovery descriptor if recovery is supported, {@code null otherwise.}
+ */
+ @Nullable public GridNioRecoveryDescriptor outRecoveryDescriptor();
/**
* @return Recovery descriptor if recovery is supported, {@code null otherwise.}
*/
- @Nullable public GridNioRecoveryDescriptor recoveryDescriptor();
+ @Nullable public GridNioRecoveryDescriptor inRecoveryDescriptor();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 0bcfe64..53a624d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -296,12 +296,22 @@ public class GridNioSessionImpl implements GridNioSession {
}
/** {@inheritDoc} */
- @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
- @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+ @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
return null;
}
@@ -309,4 +319,4 @@ public class GridNioSessionImpl implements GridNioSession {
@Override public String toString() {
return S.toString(GridNioSessionImpl.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 0ba6af2..a680a33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -59,8 +59,11 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
/** Read buffer. */
private ByteBuffer readBuf;
- /** Recovery data. */
- private GridNioRecoveryDescriptor recovery;
+ /** Incoming recovery data. */
+ private GridNioRecoveryDescriptor inRecovery;
+
+ /** Outgoing recovery data. */
+ private GridNioRecoveryDescriptor outRecovery;
/** Logger. */
private final IgniteLogger log;
@@ -124,7 +127,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
* @param key Selection key.
*/
void key(SelectionKey key) {
- assert this.key == null;
+ assert key != null;
this.key = key;
}
@@ -225,17 +228,17 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
if (sem != null && !last.messageThread())
sem.release();
- if (recovery != null) {
- if (!recovery.add(last)) {
+ if (outRecovery != null) {
+ if (!outRecovery.add(last)) {
LT.warn(log, null, "Unacknowledged messages queue size overflow, will attempt to reconnect " +
"[remoteAddr=" + remoteAddress() +
- ", queueLimit=" + recovery.queueLimit() + ']');
+ ", queueLimit=" + outRecovery.queueLimit() + ']');
if (log.isDebugEnabled())
log.debug("Unacknowledged messages queue size overflow, will attempt to reconnect " +
"[remoteAddr=" + remoteAddress() +
- ", queueSize=" + recovery.messagesFutures().size() +
- ", queueLimit=" + recovery.queueLimit() + ']');
+ ", queueSize=" + outRecovery.messagesFutures().size() +
+ ", queueLimit=" + outRecovery.queueLimit() + ']');
close();
}
@@ -272,24 +275,35 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
}
/** {@inheritDoc} */
- @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ assert recoveryDesc != null;
+
+ outRecovery = recoveryDesc;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+ return outRecovery;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
assert recoveryDesc != null;
- recovery = recoveryDesc;
+ inRecovery = recoveryDesc;
}
/** {@inheritDoc} */
- @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
- return recovery;
+ @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
+ return inRecovery;
}
/** {@inheritDoc} */
@Override public <T> T addMeta(int key, @Nullable T val) {
- if (val instanceof GridNioRecoveryDescriptor) {
- recovery = (GridNioRecoveryDescriptor)val;
+ if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
+ outRecovery = (GridNioRecoveryDescriptor)val;
- if (!accepted())
- recovery.connected();
+ outRecovery.connected();
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 74ecc45..63afb61 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -102,6 +102,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -236,6 +237,9 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@IgniteSpiConsistencyChecked(optional = false)
public class TcpCommunicationSpi extends IgniteSpiAdapter
implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
+ /** */
+ private static final IgniteProductVersion TWO_CONN_SINCE_VER = IgniteProductVersion.fromString("1.7.2");
+
/** IPC error message. */
public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
"(switching to TCP, may be slower).";
@@ -365,7 +369,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (!stopping) {
boolean reconnect = false;
- GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
+ GridNioRecoveryDescriptor recoveryData = ses.outRecoveryDescriptor();
if (recoveryData != null) {
if (recoveryData.nodeAlive(getSpiContext().node(id))) {
@@ -432,52 +436,33 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (ses.remoteAddress() == null)
return;
- GridCommunicationClient oldClient = clients.get(sndId);
-
- boolean hasShmemClient = false;
+ assert msg instanceof HandshakeMessage : msg;
- if (oldClient != null) {
- if (oldClient instanceof GridTcpNioCommunicationClient) {
- if (log.isDebugEnabled())
- log.debug("Received incoming connection when already connected " +
- "to this node, rejecting [locNode=" + locNode.id() +
- ", rmtNode=" + sndId + ']');
+ HandshakeMessage msg0 = (HandshakeMessage)msg;
- ses.send(new RecoveryLastReceivedMessage(-1));
+ if (useTwoConnections(rmtNode)) {
+ final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode);
- return;
- }
- else {
- assert oldClient instanceof GridShmemCommunicationClient;
+ boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(),
+ new ConnectClosureNew(ses, recoveryDesc, rmtNode));
- hasShmemClient = true;
- }
+ if (reserve)
+ connectedNew(recoveryDesc, ses, true);
}
+ else {
+ GridCommunicationClient oldClient = clients.get(sndId);
- GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
-
- GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
-
- assert msg instanceof HandshakeMessage : msg;
-
- HandshakeMessage msg0 = (HandshakeMessage)msg;
-
- final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
-
- if (oldFut == null) {
- oldClient = clients.get(sndId);
+ boolean hasShmemClient = false;
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
if (log.isDebugEnabled())
log.debug("Received incoming connection when already connected " +
- "to this node, rejecting [locNode=" + locNode.id() +
- ", rmtNode=" + sndId + ']');
+ "to this node, rejecting [locNode=" + locNode.id() +
+ ", rmtNode=" + sndId + ']');
ses.send(new RecoveryLastReceivedMessage(-1));
- fut.onDone(oldClient);
-
return;
}
else {
@@ -487,43 +472,73 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
- boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+ GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
+
+ GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
+
+ final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode);
+
+ if (oldFut == null) {
+ oldClient = clients.get(sndId);
+
+ if (oldClient != null) {
+ if (oldClient instanceof GridTcpNioCommunicationClient) {
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection when already connected " +
+ "to this node, rejecting [locNode=" + locNode.id() +
+ ", rmtNode=" + sndId + ']');
+
+ ses.send(new RecoveryLastReceivedMessage(-1));
+
+ fut.onDone(oldClient);
+
+ return;
+ }
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
+
+ hasShmemClient = true;
+ }
+ }
+
+ boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
- if (log.isDebugEnabled())
- log.debug("Received incoming connection from remote node " +
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection from remote node " +
"[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
- if (reserved) {
- try {
- GridTcpNioCommunicationClient client =
+ if (reserved) {
+ try {
+ GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
- fut.onDone(client);
- }
- finally {
- clientFuts.remove(rmtNode.id(), fut);
+ fut.onDone(client);
+ }
+ finally {
+ clientFuts.remove(rmtNode.id(), fut);
+ }
}
}
- }
- else {
- if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
- if (log.isDebugEnabled()) {
- log.debug("Received incoming connection from remote node while " +
+ else {
+ if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received incoming connection from remote node while " +
"connecting to this node, rejecting [locNode=" + locNode.id() +
", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
", rmtNodeOrder=" + rmtNode.order() + ']');
- }
+ }
- ses.send(new RecoveryLastReceivedMessage(-1));
- }
- else {
- // The code below causes a race condition between shmem and TCP (see IGNITE-1294)
- boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+ ses.send(new RecoveryLastReceivedMessage(-1));
+ }
+ else {
+ // The code below causes a race condition between shmem and TCP (see IGNITE-1294)
+ boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
- if (reserved)
- connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
+ if (reserved)
+ connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
+ }
}
}
}
@@ -553,10 +568,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else {
rcvdMsgsCnt.increment();
- GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+ if (msg instanceof RecoveryLastReceivedMessage) {
+ GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
- if (recovery != null) {
- if (msg instanceof RecoveryLastReceivedMessage) {
+ if (recovery != null) {
RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
if (log.isDebugEnabled())
@@ -567,7 +582,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return;
}
- else {
+ }
+ else {
+ GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
+
+ if (recovery != null) {
long rcvCnt = recovery.onReceived();
if (rcvCnt % ackSndThreshold == 0) {
@@ -623,7 +642,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
boolean createClient) {
recovery.onHandshake(rcvCnt);
- ses.recoveryDescriptor(recovery);
+ ses.inRecoveryDescriptor(recovery);
nioSrvr.resend(ses);
@@ -647,6 +666,79 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ * @param recovery Recovery descriptor.
+ * @param ses Session.
+ * @param sndRes If {@code true} sends response for recovery handshake.
+ */
+ private void connectedNew(
+ GridNioRecoveryDescriptor recovery,
+ GridNioSession ses,
+ boolean sndRes) {
+ ses.inRecoveryDescriptor(recovery);
+
+ if (sndRes)
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+
+ recovery.connected();
+ }
+
+ /**
+ *
+ */
+ class ConnectClosureNew implements IgniteInClosure<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final GridNioSession ses;
+
+ /** */
+ private final GridNioRecoveryDescriptor recoveryDesc;
+
+ /** */
+ private final ClusterNode rmtNode;
+
+ /**
+ * @param ses Incoming session.
+ * @param recoveryDesc Recovery descriptor.
+ * @param rmtNode Remote node.
+ */
+ ConnectClosureNew(GridNioSession ses,
+ GridNioRecoveryDescriptor recoveryDesc,
+ ClusterNode rmtNode) {
+ this.ses = ses;
+ this.recoveryDesc = recoveryDesc;
+ this.rmtNode = rmtNode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(Boolean success) {
+ if (success) {
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> msgFut) {
+ try {
+ msgFut.get();
+
+ connectedNew(recoveryDesc, ses, false);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send recovery handshake " +
+ "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+
+ recoveryDesc.release();
+ }
+ }
+ };
+
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
+ }
+ else
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1));
+ }
+ }
+
+ /**
*
*/
@SuppressWarnings("PackageVisibleInnerClass")
@@ -867,6 +959,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
+ /** */
+ private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap();
+
+ /** */
+ private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -1407,6 +1505,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.append(']').append(U.nl());
}
+ for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
+ GridNioRecoveryDescriptor desc = entry.getValue();
+
+ sb.append(" [key=").append(entry.getKey())
+ .append(", msgsSent=").append(desc.sent())
+ .append(", msgsAckedByRmt=").append(desc.acked())
+ .append(", reserveCnt=").append(desc.reserveCount())
+ .append(", descIdHash=").append(System.identityHashCode(desc))
+ .append(']').append(U.nl());
+ }
+
+ for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
+ GridNioRecoveryDescriptor desc = entry.getValue();
+
+ sb.append(" [key=").append(entry.getKey())
+ .append(", msgsRcvd=").append(desc.received())
+ .append(", lastAcked=").append(desc.lastAcknowledged())
+ .append(", reserveCnt=").append(desc.reserveCount())
+ .append(", descIdHash=").append(System.identityHashCode(desc))
+ .append(']').append(U.nl());
+ }
+
sb.append("Communication SPI clients: ").append(U.nl());
for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
@@ -1881,6 +2001,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
clientFut.onDone(err);
recoveryDescs.clear();
+ inRecDescs.clear();
+ outRecDescs.clear();
}
/** {@inheritDoc} */
@@ -2364,7 +2486,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"(node left topology): " + node);
}
- GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node);
+ GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node);
if (!recoveryDesc.reserve()) {
U.closeQuiet(ch);
@@ -2682,6 +2804,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else
ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
}
+
if (recovery != null) {
if (log.isDebugEnabled())
log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
@@ -2807,9 +2930,41 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* @param node Node.
+ * @return Recovery descriptor for outgoing connection.
+ */
+ private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node) {
+ if (useTwoConnections(node))
+ return recoveryDescriptor(outRecDescs, node);
+ else
+ return recoveryDescriptor(recoveryDescs, node);
+ }
+
+ /**
+ * @param node Node.
+ * @return Recovery descriptor for incoming connection.
+ */
+ private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node) {
+ if (useTwoConnections(node))
+ return recoveryDescriptor(inRecDescs, node);
+ else
+ return recoveryDescriptor(recoveryDescs, node);
+ }
+
+ /**
+ * @param node Node.
+ * @return {@code True} if given node supports two connections per-node for communication.
+ */
+ private boolean useTwoConnections(ClusterNode node) {
+ return node.version().compareToIgnoreTimestamp(TWO_CONN_SINCE_VER) >= 0;
+ }
+
+ /**
+ * @param node Node.
* @return Recovery receive data for given node.
*/
- private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) {
+ private GridNioRecoveryDescriptor recoveryDescriptor(
+ ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs,
+ ClusterNode node) {
ClientKey id = new ClientKey(node.id(), node.order());
GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
@@ -3128,7 +3283,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridNioRecoveryDescriptor recovery = null;
- if (client instanceof GridTcpNioCommunicationClient) {
+ if (!useTwoConnections(node) && client instanceof GridTcpNioCommunicationClient) {
recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
@@ -3149,6 +3304,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long idleTime = client.getIdleTime();
if (idleTime >= idleConnTimeout) {
+ if (recovery == null && useTwoConnections(node))
+ recovery = outRecDescs.get(new ClientKey(node.id(), node.order()));
+
if (recovery != null &&
recovery.nodeAlive(getSpiContext().node(nodeId)) &&
!recovery.messagesFutures().isEmpty()) {
@@ -3166,12 +3324,51 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
clients.remove(nodeId, client);
}
}
+
+ for (GridNioSession ses : nioSrvr.sessions()) {
+ GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
+
+ if (recovery != null && useTwoConnections(recovery.node())) {
+ assert ses.accepted() : ses;
+
+ sendAckOnTimeout(recovery, ses);
+ }
+ }
+ }
+
+ /**
+ * @param recovery Recovery descriptor.
+ * @param ses Session.
+ */
+ private void sendAckOnTimeout(GridNioRecoveryDescriptor recovery, GridNioSession ses) {
+ if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+ RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+
+ if (log.isDebugEnabled()) {
+ log.debug("Send recovery acknowledgement on timeout [rmtNode=" + recovery.node().id() +
+ ", rcvCnt=" + msg.received() +
+ ", lastAcked=" + recovery.lastAcknowledged() + ']');
+ }
+
+ nioSrvr.sendSystem(ses, msg);
+
+ recovery.lastAcknowledged(msg.received());
+ }
}
/**
*
*/
private void cleanupRecovery() {
+ cleanupRecovery(recoveryDescs);
+ cleanupRecovery(inRecDescs);
+ cleanupRecovery(outRecDescs);
+ }
+
+ /**
+ *
+ */
+ private void cleanupRecovery(ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs) {
Set<ClientKey> left = null;
for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
index 201fd27..58b91e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
@@ -369,12 +369,22 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
// No-op.
}
/** {@inheritDoc} */
- @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+ @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 97eb34c..c7f7ad4 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -280,13 +280,13 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
@Override public boolean apply() {
Collection sessions = U.field(srv, "sessions");
- return sessions.size() == 1;
+ return sessions.size() == 2;
}
}, 5000);
Collection sessions = U.field(srv, "sessions");
- assertEquals(1, sessions.size());
+ assertEquals(2, sessions.size());
}
assertEquals(expMsgs, lsnr.cntr.get());
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 7bbf531..f210bec 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -370,7 +370,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
for (GridNioSession ses : sessions) {
- final GridNioRecoveryDescriptor snd = ses.recoveryDescriptor();
+ final GridNioRecoveryDescriptor snd = ses.outRecoveryDescriptor();
if (snd != null) {
GridTestUtils.waitForCondition(new GridAbsPredicate() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 34872c6..fb2dfd7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -173,7 +173,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
boolean found = false;
for (GridNioSession ses : sessions) {
- final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+ final GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
if (recoveryDesc != null) {
found = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index 25e3611..e153fe2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -187,7 +187,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
boolean found = false;
for (GridNioSession ses : sessions) {
- final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+ final GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
if (recoveryDesc != null) {
found = true;