You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/20 14:45:16 UTC
[18/22] ignite git commit: conn
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4030ef88
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4030ef88
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4030ef88
Branch: refs/heads/ignite-comm-balance
Commit: 4030ef886043360e846926f2a72d5a8a81393e21
Parents: 6e72f51
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 20 11:49:10 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 20 14:37:06 2016 +0300
----------------------------------------------------------------------
.../nio/GridAbstractCommunicationClient.java | 1 +
.../util/nio/GridShmemCommunicationClient.java | 1 +
.../util/nio/GridTcpNioCommunicationClient.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 244 ++++++++++++++-----
...eAtomicMessageRecovery10ConnectionsTest.java | 28 +++
.../IgniteCacheMessageRecoveryAbstractTest.java | 24 +-
.../spi/GridTcpSpiForwardingSelfTest.java | 5 +-
...mmunicationSpiConcurrentConnectSelfTest.java | 17 +-
.../GridTcpCommunicationSpiConfigSelfTest.java | 3 +
...cpCommunicationSpiMultithreadedSelfTest.java | 12 +-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 +
...GridTcpCommunicationSpiRecoverySelfTest.java | 1 +
...CommunicationRecoveryAckClosureSelfTest.java | 1 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
14 files changed, 268 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 37bc170..f2ab932 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -39,6 +39,7 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati
private final int connIdx;
/**
+ * @param connIdx Connection index.
* @param metricsLsnr Metrics listener.
*/
protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index 74d58b2..d941bae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -48,6 +48,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
private final MessageFormatter formatter;
/**
+ * @param connIdx Connection index.
* @param metricsLsnr Metrics listener.
* @param port Shared memory IPC server port.
* @param connTimeout Connection timeout.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 90f17b9..fcb40c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -45,8 +45,8 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
private final IgniteLogger log;
/**
- * @param ses Session.
* @param connIdx Connection index.
+ * @param ses Session.
* @param log Logger.
*/
public GridTcpNioCommunicationClient(
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7d91120..2d1a2b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -47,6 +47,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.apache.ignite.Ignite;
@@ -54,6 +55,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -291,8 +293,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
- /** Connection ID meta for session. */
- private static final int CONN_ID_META = GridNioSessionMetaKey.nextUniqueKey();
+ /** Connection index meta for session. */
+ private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
/** Message tracker meta for session. */
private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
@@ -312,8 +314,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Default socket write timeout. */
public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
- /** */
- public static final int DFLT_CONN_PER_NODE = 2;
+ /** Default connections per node. */
+ public static final int DFLT_CONN_PER_NODE = 1;
/** No-op runnable. */
private static final IgniteRunnable NOOP = new IgniteRunnable() {
@@ -362,7 +364,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
- ConnectionKey connId = ses.meta(CONN_ID_META);
+ ConnectionKey connId = ses.meta(CONN_IDX_META);
if (connId != null) {
UUID id = connId.nodeId();
@@ -442,7 +444,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return;
}
- final ConnectionKey old = ses.addMeta(CONN_ID_META, connKey);
+ final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
assert old == null;
@@ -526,7 +528,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
- new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+ new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
if (log.isDebugEnabled())
log.debug("Received incoming connection from remote node " +
@@ -558,7 +560,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else {
// The code below causes a race condition between shmem and TCP (see IGNITE-1294)
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
- new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+ new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
if (reserved)
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
@@ -568,7 +570,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
@Override public void onMessage(GridNioSession ses, Message msg) {
- ConnectionKey connKey = ses.meta(CONN_ID_META);
+ ConnectionKey connKey = ses.meta(CONN_IDX_META);
if (connKey == null) {
assert ses.accepted() : ses;
@@ -684,7 +686,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (createClient) {
client = new GridTcpNioCommunicationClient(0, ses, log);
- addNodeClient(node.id(), 0, client);
+ addNodeClient(node, 0, client);
}
return client;
@@ -794,10 +796,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private final boolean createClient;
+ /** */
+ private final ConnectionKey connKey;
+
/**
* @param ses Incoming session.
* @param recoveryDesc Recovery descriptor.
* @param rmtNode Remote node.
+ * @param connKey Connection key.
* @param msg Handshake message.
* @param createClient If {@code true} creates NIO communication client..
* @param fut Connect future.
@@ -805,12 +811,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ConnectClosure(GridNioSession ses,
GridNioRecoveryDescriptor recoveryDesc,
ClusterNode rmtNode,
+ ConnectionKey connKey,
HandshakeMessage msg,
boolean createClient,
GridFutureAdapter<GridCommunicationClient> fut) {
this.ses = ses;
this.recoveryDesc = recoveryDesc;
this.rmtNode = rmtNode;
+ this.connKey = connKey;
this.msg = msg;
this.createClient = createClient;
this.fut = fut;
@@ -839,7 +847,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
fut.onDone();
}
finally {
- clientFuts.remove(rmtNode.id(), fut);
+ clientFuts.remove(connKey, fut);
}
}
};
@@ -851,7 +859,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
fut.onDone();
}
finally {
- clientFuts.remove(rmtNode.id(), fut);
+ clientFuts.remove(connKey, fut);
}
}
}
@@ -1105,10 +1113,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return locPortRange;
}
+ /**
+ * TODO
+ *
+ * @param maxConnectionsPerNode
+ */
public void setConnectionsPerNode(int maxConnectionsPerNode) {
this.connectionsPerNode = maxConnectionsPerNode;
}
+ /**
+ * TODO
+ *
+ * @return
+ */
public int getConnectionsPerNode() {
return connectionsPerNode;
}
@@ -1553,6 +1571,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.append(", msgsSent=").append(desc.sent())
.append(", msgsAckedByRmt=").append(desc.acked())
.append(", reserveCnt=").append(desc.reserveCount())
+ .append(", connected=").append(desc.connected())
+ .append(", reserved=").append(desc.reserved())
.append(", descIdHash=").append(System.identityHashCode(desc))
.append(']').append(U.nl());
}
@@ -1578,9 +1598,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridCommunicationClient[] clients0 = entry.getValue();
for (GridCommunicationClient client : clients0) {
- sb.append(" [node=").append(nodeId)
- .append(", client=").append(client)
- .append(']').append(U.nl());
+ if (client != null) {
+ sb.append(" [node=").append(nodeId)
+ .append(", client=").append(client)
+ .append(']').append(U.nl());
+ }
}
}
@@ -1606,6 +1628,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
+ assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0");
+ assertParameter(connectionsPerNode <= 1024, "connectionsPerNode <= 1024");
if (!failureDetectionTimeoutEnabled()) {
assertParameter(reconCnt > 0, "reconnectCnt > 0");
@@ -1691,6 +1715,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("sockRcvBuf", sockRcvBuf));
log.debug(configInfo("shmemPort", shmemPort));
log.debug(configInfo("msgQueueLimit", msgQueueLimit));
+ log.debug(configInfo("connectionsPerNode", connectionsPerNode));
if (failureDetectionTimeoutEnabled()) {
log.debug(configInfo("connTimeout", connTimeout));
@@ -1809,7 +1834,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- ConnectionKey key = ses.meta(CONN_ID_META);
+ ConnectionKey key = ses.meta(CONN_IDX_META);
return key != null ? formatter.reader(key.nodeId(), msgFactory) : null;
}
@@ -1824,7 +1849,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- ConnectionKey key = ses.meta(CONN_ID_META);
+ ConnectionKey key = ses.meta(CONN_IDX_META);
return key != null ? formatter.writer(key.nodeId()) : null;
}
@@ -2003,8 +2028,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// Force closing on stop (safety).
for (GridCommunicationClient[] clients0 : clients.values()) {
- for (GridCommunicationClient client : clients0)
- client.forceClose();
+ for (GridCommunicationClient client : clients0) {
+ if (client != null)
+ client.forceClose();
+ }
}
// Clear resources.
@@ -2031,8 +2058,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// Force closing.
for (GridCommunicationClient[] clients0 : clients.values()) {
- for (GridCommunicationClient client : clients0)
- client.forceClose();
+ for (GridCommunicationClient client : clients0) {
+ if (client != null)
+ client.forceClose();
+ }
}
getSpiContext().deregisterPorts();
@@ -2045,8 +2074,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
connectGate.disconnected(reconnectFut);
for (GridCommunicationClient[] clients0 : clients.values()) {
- for (GridCommunicationClient client : clients0)
- client.forceClose();
+ for (GridCommunicationClient client : clients0) {
+ if (client != null)
+ client.forceClose();
+ }
}
IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
@@ -2075,11 +2106,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (clients0 != null) {
for (GridCommunicationClient client : clients0) {
- if (log.isDebugEnabled())
- log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
- ", client=" + client + ']');
+ if (client != null) {
+ if (log.isDebugEnabled())
+ log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
+ ", client=" + client + ']');
- client.forceClose();
+ client.forceClose();
+ }
}
}
}
@@ -2128,12 +2161,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
sendMessage0(node, msg, ackC);
}
+ /** */
+ private final int idxMode = IgniteSystemProperties.getInteger("CONN_IDX_MODE", 0);
+
+ /** */
+ private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
+
+ /** */
+ private final AtomicInteger connIdx = new AtomicInteger();
+
/**
* TODO
* @return
*/
private int connectionIndex() {
- return ThreadLocalRandom.current().nextInt(connectionsPerNode);
+ switch (idxMode) {
+ case 0: {
+ return (int)(Thread.currentThread().getId() % connectionsPerNode);
+ }
+
+ case 1: {
+ Integer threadIdx = threadConnIdx.get();
+
+ if (threadIdx != null)
+ return threadIdx;
+
+ for (;;) {
+ int idx = connIdx.get();
+ int nextIdx = idx == connectionsPerNode - 1 ? 0 : idx + 1;
+
+ if (connIdx.compareAndSet(idx, nextIdx)) {
+ threadConnIdx.set(idx);
+
+ return idx;
+ }
+ }
+ }
+
+ case 2:
+ return ThreadLocalRandom.current().nextInt(connectionsPerNode);
+
+ default:
+ throw new IgniteException("Invalid connection index mode: " + idxMode);
+ }
}
/**
@@ -2215,48 +2285,48 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
for (;;) {
GridCommunicationClient[] curClients = clients.get(nodeId);
- if (curClients == null)
+ if (curClients == null || curClients[rmvClient.connectionIndex()] != rmvClient)
return false;
- if (curClients[rmvClient.connectionIndex()] == rmvClient) {
- GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length);
+ GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length);
- newClients[rmvClient.connectionIndex()] = null;
+ newClients[rmvClient.connectionIndex()] = null;
- if (clients.replace(nodeId, curClients, newClients))
- return true;
- }
- else
- return false;
+ if (clients.replace(nodeId, curClients, newClients))
+ return true;
}
}
/**
- * @param nodeId Node ID.
+ * @param node Node.
* @param connIdx Connection index.
* @param addClient Client to add.
*/
- private void addNodeClient(UUID nodeId, int connIdx, GridCommunicationClient addClient) {
+ private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
+ assert connectionsPerNode > 0 : connectionsPerNode;
+
for (;;) {
- GridCommunicationClient[] curClients = clients.get(nodeId);
+ GridCommunicationClient[] curClients = clients.get(node.id());
- assert curClients == null || curClients[connIdx] == null : "Client already created " +
- "[node=" + nodeId + ", client=" + addClient + ", oldClient=" + curClients[connIdx] + ']';
+ assert curClients == null || curClients[connIdx] == null : "Client already created [node=" + node.id() +
+ ", connIdx=" + connIdx +
+ ", client=" + addClient +
+ ", oldClient=" + curClients[connIdx] + ']';
GridCommunicationClient[] newClients;
if (curClients == null) {
- newClients = new GridCommunicationClient[connectionsPerNode];
+ newClients = new GridCommunicationClient[useMultipleConnections(node) ? connectionsPerNode : 1];
newClients[connIdx] = addClient;
- if (clients.putIfAbsent(nodeId, newClients) == null)
+ if (clients.putIfAbsent(node.id(), newClients) == null)
break;
}
else {
newClients = Arrays.copyOf(curClients, curClients.length);
newClients[connIdx] = addClient;
- if (clients.replace(nodeId, curClients, newClients))
+ if (clients.replace(node.id(), curClients, newClients))
break;
}
}
@@ -2272,6 +2342,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
assert node != null;
+ assert connIdx >= 0 && connIdx < connectionsPerNode : connIdx;
UUID nodeId = node.id();
@@ -2301,7 +2372,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
client0 = createNioClient(node, connIdx);
if (client0 != null) {
- addNodeClient(nodeId, connIdx, client0);
+ addNodeClient(node, connIdx, client0);
if (client0 instanceof GridTcpNioCommunicationClient) {
GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
@@ -2328,7 +2399,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
throw (Error)e;
}
finally {
- clientFuts.remove(nodeId, fut);
+ clientFuts.remove(connKey, fut);
}
}
else
@@ -2525,7 +2596,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
- ConnectionKey id = ses.meta(CONN_ID_META);
+ ConnectionKey id = ses.meta(CONN_IDX_META);
if (id != null) {
ClusterNode node = getSpiContext().node(id.nodeId);
@@ -2666,7 +2737,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
try {
Map<Integer, Object> meta = new HashMap<>();
- meta.put(CONN_ID_META, connKey);
+ meta.put(CONN_IDX_META, connKey);
if (isSslEnabled()) {
assert sslEngine != null;
@@ -3085,8 +3156,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
U.join(commWorker, log);
for (GridCommunicationClient[] clients0 : clients.values()) {
- for (GridCommunicationClient client : clients0)
- client.forceClose();
+ for (GridCommunicationClient client : clients0) {
+ if (client != null)
+ client.forceClose();
+ }
}
}
@@ -3281,7 +3354,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- ConnectionKey connKey = ses.meta(CONN_ID_META);
+ ConnectionKey connKey = ses.meta(CONN_IDX_META);
return connKey != null ? formatter.writer(connKey.nodeId()) : null;
}
@@ -3297,7 +3370,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
- ConnectionKey connKey = ses.meta(CONN_ID_META);
+ ConnectionKey connKey = ses.meta(CONN_IDX_META);
return connKey != null ? formatter.reader(connKey.nodeId(), msgFactory) : null;
}
@@ -3384,6 +3457,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
UUID nodeId = e.getKey();
for (GridCommunicationClient client : e.getValue()) {
+ if (client == null)
+ continue;
+
ClusterNode node = getSpiContext().node(nodeId);
if (node == null) {
@@ -3483,7 +3559,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
- *
+ * @param recoveryDescs Recovery descriptors to cleanup.
*/
private void cleanupRecovery(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs) {
Set<ConnectionKey> left = null;
@@ -3492,9 +3568,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (left != null && left.contains(e.getKey()))
continue;
- GridNioRecoveryDescriptor recoverySnd = e.getValue();
+ GridNioRecoveryDescriptor recoveryDesc = e.getValue();
- if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
+ if (!recoveryDesc.nodeAlive(getSpiContext().node(e.getKey().nodeId()))) {
if (left == null)
left = new HashSet<>();
@@ -3506,10 +3582,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert !left.isEmpty();
for (ConnectionKey id : left) {
- GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id);
+ GridNioRecoveryDescriptor recoveryDesc = recoveryDescs.get(id);
- if (recoverySnd != null && recoverySnd.onNodeLeft())
- recoveryDescs.remove(id);
+ if (recoveryDesc != null && recoveryDesc.onNodeLeft())
+ recoveryDescs.remove(id, recoveryDesc);
}
}
}
@@ -3795,7 +3871,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (buf.remaining() < 33)
return false;
- buf.put(HANDSHAKE_MSG_TYPE);
+ buf.put(directType());
byte[] bytes = U.uuidToBytes(nodeId);
@@ -3867,7 +3943,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param rcvCnt Number of received messages.
* @param connIdx Connection index.
*/
- public HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
+ HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
super(nodeId, connectCnt, rcvCnt);
this.connIdx = connIdx;
}
@@ -3883,6 +3959,32 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (buf.remaining() < 4)
+ return false;
+
+ buf.putInt(connIdx);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ if (buf.remaining() < 4)
+ return false;
+
+ connIdx = buf.getInt();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(HandshakeMessage2.class, this);
}
@@ -4184,6 +4286,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ConnectionKey key = (ConnectionKey) o;
+
+ return idx == key.idx && nodeId.equals(key.nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+ res = 31 * res + idx;
+ return res;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(ConnectionKey.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
new file mode 100644
index 0000000..30fc9ef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicMessageRecovery10ConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest {
+ /** {@inheritDoc} */
+ @Override protected int connectionsPerNode() {
+ return 10;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 0460a8f..1bfd727 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -58,6 +58,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
commSpi.setSocketWriteTimeout(1000);
commSpi.setSharedMemoryPort(-1);
+ commSpi.setConnectionsPerNode(connectionsPerNode());
cfg.setCommunicationSpi(commSpi);
@@ -76,6 +77,13 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
}
/**
+ * @return Value for {@link TcpCommunicationSpi#setConnectionsPerNode(int)}.
+ */
+ protected int connectionsPerNode() {
+ return TcpCommunicationSpi.DFLT_CONN_PER_NODE;
+ }
+
+ /**
* @return Cache atomicity mode.
*/
protected abstract CacheAtomicityMode atomicityMode();
@@ -174,18 +182,22 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
static boolean closeSessions(Ignite ignite) throws Exception {
TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi();
- Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients");
+ Map<UUID, GridCommunicationClient[]> clients = U.field(commSpi, "clients");
boolean closed = false;
- for (GridCommunicationClient client : clients.values()) {
- GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client;
+ for (GridCommunicationClient[] clients0 : clients.values()) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null) {
+ GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client;
- GridNioSession ses = client0.session();
+ GridNioSession ses = client0.session();
- ses.close();
+ ses.close();
- closed = true;
+ closed = true;
+ }
+ }
}
return closed;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 652e47f..deda313 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -111,14 +111,15 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
cfg.setConnectorConfiguration(null);
TcpCommunicationSpi commSpi = new TcpCommunicationSpi() {
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+ @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx)
+ throws IgniteCheckedException {
Map<String, Object> attrs = new HashMap<>(node.attributes());
attrs.remove(createSpiAttributeName(ATTR_PORT));
((TcpDiscoveryNode)node).setAttributes(attrs);
- return super.createTcpClient(node);
+ return super.createTcpClient(node, connIdx);
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index c7f7ad4..bd66319 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -78,6 +78,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
/** */
private static int port = 60_000;
+ /** */
+ private int connectionsPerNode = 1;
+
/**
*
*/
@@ -158,6 +161,17 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
/**
* @throws Exception If failed.
*/
+ public void testMultithreaded_10Connections() throws Exception {
+ connectionsPerNode = 10;
+
+ int threads = Runtime.getRuntime().availableProcessors() * 5;
+
+ concurrentConnect(threads, 10, 10, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testWithLoad() throws Exception {
int threads = Runtime.getRuntime().availableProcessors() * 5;
@@ -286,7 +300,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
Collection sessions = U.field(srv, "sessions");
- assertEquals(2, sessions.size());
+ assertEquals(2 * connectionsPerNode, sessions.size());
}
assertEquals(expMsgs, lsnr.cntr.get());
@@ -315,6 +329,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
spi.setIdleConnectionTimeout(60_000);
spi.setConnectTimeout(10_000);
spi.setSharedMemoryPort(-1);
+ spi.setConnectionsPerNode(connectionsPerNode);
return spi;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index b0353a6..c84ee32 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -47,6 +47,9 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig
checkNegativeSpiProperty(new TcpCommunicationSpi(), "ackSendThreshold", 0);
checkNegativeSpiProperty(new TcpCommunicationSpi(), "ackSendThreshold", -1);
checkNegativeSpiProperty(new TcpCommunicationSpi(), "unacknowledgedMessagesBufferSize", -1);
+ checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionsPerNode", 0);
+ checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionsPerNode", -1);
+ checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionsPerNode", Integer.MAX_VALUE);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index f210bec..74be68d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -109,7 +109,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
/**
* @param useShmem Use shared mem.
*/
- protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
+ GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
super(false);
this.useShmem = useShmem;
@@ -549,11 +549,17 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
}
for (CommunicationSpi spi : spis.values()) {
- final ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
+ final ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients");
assert GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
- return clients.isEmpty();
+ for (GridCommunicationClient[] clients0 : clients.values()) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null)
+ return false;
+ }
+ }
+ return true;
}
}, getTestTimeout()) : "Clients: " + clients;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index fb2dfd7..e0478da 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -361,6 +361,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
spi.setAckSendThreshold(ackCnt);
spi.setMessageQueueLimit(queueLimit);
spi.setSharedMemoryPort(-1);
+ spi.setConnectionsPerNode(1);
return spi;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index bec6e00..dfa8a54 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -644,6 +644,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
spi.setSocketWriteTimeout(1000);
spi.setSocketSendBuffer(512);
spi.setSocketReceiveBuffer(512);
+ spi.setConnectionsPerNode(1);
return spi;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index e153fe2..9928d93 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -397,6 +397,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
spi.setAckSendThreshold(ackCnt);
spi.setMessageQueueLimit(queueLimit);
spi.setSharedMemoryPort(-1);
+ spi.setConnectionsPerNode(1);
return spi;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index dff9fc7..2fd84ab 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -124,6 +124,7 @@ import org.apache.ignite.internal.processors.cache.context.IgniteCacheTxExecutio
import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUpdateTopologyChangeTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecovery10ConnectionsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnectionTest;
@@ -282,6 +283,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(GridCacheEntrySetIterationPreloadingSelfTest.class);
suite.addTestSuite(GridCacheMixedPartitionExchangeSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
+ suite.addTestSuite(IgniteCacheAtomicMessageRecovery10ConnectionsTest.class);
suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);
suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnectionTest.class);