You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/22 14:42:42 UTC
[04/12] ignite git commit: conn
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b0ffee0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b0ffee0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b0ffee0
Branch: refs/heads/ignite-comm-balance
Commit: 3b0ffee055ed843616282f013daa9d0b982e13bf
Parents: c604e8c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 21 12:54:53 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 21 12:54:53 2016 +0300
----------------------------------------------------------------------
.../util/nio/GridSelectorNioSessionImpl.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 47 +++++++++++++++-----
.../IgniteCacheMessageWriteTimeoutTest.java | 13 ++++--
3 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index a680a33..88721ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -303,7 +303,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
outRecovery = (GridNioRecoveryDescriptor)val;
- outRecovery.connected();
+ outRecovery.onConnected();
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c9d9bf7..c131cf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -424,13 +424,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (msg instanceof NodeIdMessage) {
sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
- connKey = new ConnectionKey(sndId, 0);
+ connKey = new ConnectionKey(sndId, 0, -1);
}
else {
assert msg instanceof HandshakeMessage : msg;
+ HandshakeMessage msg0 = (HandshakeMessage)msg;
+
sndId = ((HandshakeMessage)msg).nodeId();
- connKey = new ConnectionKey(sndId, ((HandshakeMessage)msg).connectionIndex());
+ connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
}
if (log.isDebugEnabled())
@@ -470,8 +472,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (reserve)
connectedNew(recoveryDesc, ses, true);
else {
- if (c.failed)
- ses.close();
+ if (c.failed) {
+ ses.send(new RecoveryLastReceivedMessage(-1));
+
+ for (GridNioSession ses0 : nioSrvr.sessions()) {
+ ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+ if (ses0.accepted() && key0 != null &&
+ key0.nodeId().equals(connKey.nodeId()) &&
+ key0.connectionIndex() == connKey.connectionIndex() &&
+ key0.connectCount() < connKey.connectCount())
+ ses0.close();
+ }
+ }
}
}
else {
@@ -2369,7 +2382,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// Do not allow concurrent connects.
GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
- ConnectionKey connKey = new ConnectionKey(nodeId, connIdx);
+ ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
@@ -2705,7 +2718,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"(node left topology): " + node);
}
- ConnectionKey connKey = new ConnectionKey(node.id(), connIdx);
+ ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
@@ -3097,8 +3110,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
rcvCnt = buf.getLong(1);
}
- if (log.isDebugEnabled())
- log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+ // if (log.isDebugEnabled())
+ log.info("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
if (rcvCnt == -1) {
if (log.isDebugEnabled())
@@ -3487,7 +3500,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridNioRecoveryDescriptor recovery = null;
if (!useMultipleConnections(node) && client instanceof GridTcpNioCommunicationClient) {
- recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex()));
+ recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
@@ -3508,7 +3521,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (idleTime >= idleConnTimeout) {
if (recovery == null && useMultipleConnections(node))
- recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex()));
+ recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
if (recovery != null &&
recovery.nodeAlive(getSpiContext().node(nodeId)) &&
@@ -4273,13 +4286,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private final int idx;
+ /** */
+ private final long connCnt;
+
/**
* @param nodeId Node ID.
* @param idx Connection index.
+ * @param connCnt Connection counter (set only for incoming connections).
*/
- ConnectionKey(UUID nodeId, int idx) {
+ ConnectionKey(UUID nodeId, int idx, long connCnt) {
this.nodeId = nodeId;
this.idx = idx;
+ this.connCnt = connCnt;
+ }
+
+ /**
+ * @return Connection counter.
+ */
+ long connectCount() {
+ return connCnt;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index 5b51af8..0dd4079 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -66,15 +66,20 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
super.afterTest();
}
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10 * 60_000;
+ }
+
/**
* @throws Exception If failed.
*/
public void testMessageQueueLimit() throws Exception {
- startGridsMultiThreaded(3);
-
- for (int i = 0; i < 15; i++) {
+ for (int i = 0; i < 3; i++) {
log.info("Iteration: " + i);
+ startGridsMultiThreaded(3);
+
IgniteInternalFuture<?> fut1 = startJobThreads(50);
U.sleep(100);
@@ -83,6 +88,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
fut1.get();
fut2.get();
+
+ stopAllGrids();
}
}