You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/22 14:42:39 UTC
[01/12] ignite git commit: conn
Repository: ignite
Updated Branches:
refs/heads/ignite-comm-balance 26eaaba8b -> 6a4d81965
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/03e54ac2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/03e54ac2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/03e54ac2
Branch: refs/heads/ignite-comm-balance
Commit: 03e54ac24ed666fac9cbfd84dfbf6dc3d89a3a28
Parents: 26eaaba
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 21 10:12:46 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 21 10:12:46 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/util/nio/GridNioServer.java | 8 +-------
1 file changed, 1 insertion(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/03e54ac2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index b7b02f5..5da557b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1320,10 +1320,6 @@ public class GridNioServer<T> {
}
}
- public interface NioWorker {
-
- }
-
/**
* Thread performing only read operations from the channel.
*/
@@ -1448,7 +1444,7 @@ public class GridNioServer<T> {
*
* @param req Change request.
*/
- @Override public void offer(GridNioFuture req) {
+ @Override public void offer(GridNioFuture req) {
changeReqs.offer((NioOperationFuture)req);
selector.wakeup();
@@ -2115,8 +2111,6 @@ public class GridNioServer<T> {
* @throws IgniteCheckedException If failed.
*/
private void accept() throws IgniteCheckedException {
- long lastBalance = U.currentTimeMillis();
-
try {
while (!closed && selector.isOpen() && !Thread.currentThread().isInterrupted()) {
// Wake up every 2 seconds to check if closed.
[08/12] ignite git commit: conn
Posted by sb...@apache.org.
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81832e1d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81832e1d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81832e1d
Branch: refs/heads/ignite-comm-balance
Commit: 81832e1dc9576fee9c8f74c451f4893cd5633d99
Parents: ffd654a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 14:02:39 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 14:23:04 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/util/nio/GridNioServer.java | 131 ++++++++++++-------
.../ignite/internal/util/nio/GridNioWorker.java | 19 ++-
.../util/nio/GridSelectorNioSessionImpl.java | 62 ++++++++-
.../IgniteCommunicationBalanceTest.java | 2 +-
4 files changed, 161 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 5da557b..2d5cc64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -435,10 +435,6 @@ public class GridNioServer<T> {
NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE);
impl.offerStateChange(fut);
-// int idx = impl.selectorIndex(); // TODO
-//
-// if (idx != -1)
-// clientWorkers.get(idx).offer(fut);
return fut;
}
@@ -498,14 +494,9 @@ public class GridNioServer<T> {
if (ses.removeFuture(fut))
fut.connectionClosed();
}
- else if (msgCnt == 1) {
+ else if (msgCnt == 1)
// Change from 0 to 1 means that worker thread should be waken up.
-// int idx = ses.selectorIndex();
-//
-// if (idx != -1) // TODO revisit
-// clientWorkers.get(idx).offer(fut);
ses.offerStateChange(fut);
- }
if (msgQueueLsnr != null)
msgQueueLsnr.apply(ses, msgCnt);
@@ -578,7 +569,6 @@ public class GridNioServer<T> {
ses0.resend(futs);
// Wake up worker.
- //clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
ses0.offerStateChange(fut0);
}
}
@@ -598,6 +588,10 @@ public class GridNioServer<T> {
}
public void moveSession(GridNioSession ses, int from, int to) {
+ assert from >= 0 && from < clientWorkers.size() : from;
+ assert to >= 0 && to < clientWorkers.size() : to;
+ assert from != to;
+
clientWorkers.get(from).offer(new SessionMoveFuture((GridSelectorNioSessionImpl)ses, to));
}
@@ -1336,13 +1330,15 @@ public class GridNioServer<T> {
/** Worker index. */
private final int idx;
+ /** Sessions assigned to this worker. */
+ private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions =
+ new GridConcurrentHashSet<>();
+
private volatile long bytesRcvd;
private volatile long bytesSent;
private volatile long bytesRcvd0;
private volatile long bytesSent0;
- private final GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions0 = new GridConcurrentHashSet<>();
-
/**
* @param idx Index of this worker in server's array.
* @param gridName Grid name.
@@ -1360,7 +1356,7 @@ public class GridNioServer<T> {
}
public Collection<? extends GridNioSession> sessions() {
- return sessions0;
+ return workerSessions;
}
/** {@inheritDoc} */
@@ -1409,15 +1405,15 @@ public class GridNioServer<T> {
try {
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
- Class<?> selectorImplClass =
+ Class<?> selectorImplCls =
Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader());
// Ensure the current selector implementation is what we can instrument.
- if (!selectorImplClass.isAssignableFrom(selector.getClass()))
+ if (!selectorImplCls.isAssignableFrom(selector.getClass()))
return;
- Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
- Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
+ Field selectedKeysField = selectorImplCls.getDeclaredField("selectedKeys");
+ Field publicSelectedKeysField = selectorImplCls.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
@@ -1439,17 +1435,41 @@ public class GridNioServer<T> {
}
}
- /**
- * Adds socket channel to the registration queue and wakes up reading thread.
- *
- * @param req Change request.
- */
+ /** {@inheritDoc} */
@Override public void offer(GridNioFuture req) {
changeReqs.offer((NioOperationFuture)req);
selector.wakeup();
}
+ /** {@inheritDoc} */
+ @Override public void offer(Collection<GridNioFuture> reqs) {
+ for (GridNioFuture req : reqs)
+ changeReqs.offer((NioOperationFuture)req);
+
+ selector.wakeup();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<GridNioFuture> clearSessionRequests(GridNioSession ses) {
+ List<GridNioFuture> sesReqs = null;
+
+ for (GridNioServer.NioOperationFuture changeReq : changeReqs) {
+ if (changeReq.session() == ses) {
+ boolean rmv = changeReqs.remove(changeReq);
+
+ assert rmv : changeReq;
+
+ if (sesReqs == null)
+ sesReqs = new ArrayList<>();
+
+ sesReqs.add(changeReq);
+ }
+ }
+
+ return sesReqs;
+ }
+
/**
* Processes read and write events and registration requests.
*
@@ -1477,25 +1497,29 @@ public class GridNioServer<T> {
GridSelectorNioSessionImpl ses = f.session();
if (idx == f.toIdx) {
- ses.worker = this;
+ assert f.movedSocketChannel() != null : f;
+
+ boolean add = workerSessions.add(ses);
- sessions0.add(ses);
+ assert add;
- SelectionKey key = f.socketChannel().register(selector,
- SelectionKey.OP_READ | SelectionKey.OP_WRITE, ses); // TODO what if reads were paused?
+ ses.finishMoveSession(this);
+
+ SelectionKey key = f.movedSocketChannel().register(selector,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE,
+ ses);
ses.key(key);
}
else {
- if (sessions0.remove(ses)) {
- assert ses.worker == this; // TODO replace with IF and ignore?
+ assert f.movedSocketChannel() == null : f;
- // Cleanup.
- ses.worker = null;
+ if (workerSessions.remove(ses)) {
+ ses.startMoveSession(this);
SelectionKey key = ses.key();
- f.socketChannel((SocketChannel)key.channel());
+ f.movedSocketChannel((SocketChannel)key.channel());
key.cancel();
@@ -1879,7 +1903,7 @@ public class GridNioServer<T> {
resend(ses);
sessions.add(ses);
- sessions0.add(ses);
+ workerSessions.add(ses);
try {
filterChain.onSessionOpened(ses);
@@ -1922,7 +1946,7 @@ public class GridNioServer<T> {
}
sessions.remove(ses);
- sessions0.remove(ses);
+ workerSessions.remove(ses);
SelectionKey key = ses.key();
@@ -2038,7 +2062,7 @@ public class GridNioServer<T> {
bytesSent0 = 0;
bytesRcvd0 = 0;
- for (GridSelectorNioSessionImpl ses : sessions0)
+ for (GridSelectorNioSessionImpl ses : workerSessions)
ses.reset0();
}
}
@@ -2248,7 +2272,7 @@ public class GridNioServer<T> {
/** Socket channel in register request. */
@GridToStringExclude
- protected SocketChannel sockCh; // TODO to be fixed with proper hierarchy
+ private SocketChannel sockCh;
/** Session to perform operation on. */
@GridToStringExclude
@@ -2450,11 +2474,15 @@ public class GridNioServer<T> {
/** */
private final int toIdx;
+ /** */
+ @GridToStringExclude
+ private SocketChannel movedSockCh;
+
/**
- * @param ses
- * @param toIdx
+ * @param ses Session.
+ * @param toIdx Target worker index.
*/
- public SessionMoveFuture(
+ SessionMoveFuture(
GridSelectorNioSessionImpl ses,
int toIdx
) {
@@ -2463,12 +2491,25 @@ public class GridNioServer<T> {
this.toIdx = toIdx;
}
+ /**
+ * @return Target worker index.
+ */
int toIndex() {
return toIdx;
}
- void socketChannel(SocketChannel sockCh) {
- this.sockCh = sockCh;
+ /**
+ * @return Moved session socket channel.
+ */
+ SocketChannel movedSocketChannel() {
+ return movedSockCh;
+ }
+
+ /**
+ * @param movedSockCh Moved session socket channel.
+ */
+ void movedSocketChannel(SocketChannel movedSockCh) {
+ this.movedSockCh = movedSockCh;
}
/** {@inheritDoc} */
@@ -2938,7 +2979,7 @@ public class GridNioServer<T> {
long bytesRcvd0 = worker.bytesRcvd0;
if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0 &&
- worker.sessions0.size() > 1) {
+ worker.workerSessions.size() > 1) {
maxRcvd0 = bytesRcvd0;
maxRcvdIdx = i;
@@ -2955,7 +2996,7 @@ public class GridNioServer<T> {
long bytesSent0 = worker.bytesSent0;
if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0 > 0 &&
- worker.sessions0.size() > 1) {
+ worker.workerSessions.size() > 1) {
maxSent0 = bytesSent0;
maxSentIdx = i;
@@ -2989,7 +3030,7 @@ public class GridNioServer<T> {
double threshold = sentDiff * 0.9;
GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
- clientWorkers.get(maxSentIdx).sessions0;
+ clientWorkers.get(maxSentIdx).workerSessions;
for (GridSelectorNioSessionImpl ses0 : sessions) {
long bytesSent0 = ses0.bytesSent0();
@@ -3031,7 +3072,7 @@ public class GridNioServer<T> {
double threshold = rcvdDiff * 0.9;
GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
- clientWorkers.get(maxRcvdIdx).sessions0;
+ clientWorkers.get(maxRcvdIdx).workerSessions;
for (GridSelectorNioSessionImpl ses0 : sessions) {
long bytesRcvd0 = ses0.bytesReceived0();
http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
index d088d8c..7f8033a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
@@ -17,12 +17,27 @@
package org.apache.ignite.internal.util.nio;
+import java.util.Collection;
+import java.util.List;
+import org.jetbrains.annotations.Nullable;
+
/**
*
*/
public interface GridNioWorker {
/**
- * @param fut Future.
+ * @param req Change request.
+ */
+ void offer(GridNioFuture req);
+
+ /**
+ * @param reqs Change requests.
+ */
+ void offer(Collection<GridNioFuture> reqs);
+
+ /**
+ * @param ses Session.
+ * @return Session state change requests.
*/
- void offer(GridNioFuture fut);
+ @Nullable List<GridNioFuture> clearSessionRequests(GridNioSession ses);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index e515696..0c2033b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.util.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteLogger;
@@ -43,8 +45,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
@GridToStringExclude
private SelectionKey key;
- /** */
- public GridNioWorker worker;
+ /** Current worker thread. */
+ private GridNioWorker worker;
/** Size counter. */
private final AtomicInteger queueSize = new AtomicInteger();
@@ -68,6 +70,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
/** Logger. */
private final IgniteLogger log;
+ /** */
+ private List<GridNioFuture> pendingStateChanges;
+
/**
* Creates session instance.
*
@@ -153,13 +158,60 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
return key;
}
+ /**
+ * @param fut
+ */
void offerStateChange(GridNioFuture fut) {
- GridNioWorker worker0 = worker;
+ synchronized (this) {
+ GridNioWorker worker0 = worker;
+
+ if (worker0 == null) {
+ if (pendingStateChanges == null)
+ pendingStateChanges = new ArrayList<>();
+
+ pendingStateChanges.add(fut);
+ }
+ else
+ worker0.offer(fut);
+ }
+ }
+
+ /**
+ * @param moveFrom
+ */
+ void startMoveSession(GridNioWorker moveFrom) {
+ synchronized (this) {
+ assert this.worker == moveFrom;
- if (worker0 != null)
- worker0.offer(fut);
+ List<GridNioFuture> sesReqs = moveFrom.clearSessionRequests(this);
+
+ worker = null;
+
+ if (sesReqs != null) {
+ if (pendingStateChanges == null)
+ pendingStateChanges = new ArrayList<>();
+
+ pendingStateChanges.addAll(sesReqs);
+ }
+ }
}
+ /**
+ * @param moveTo
+ */
+ void finishMoveSession(GridNioWorker moveTo) {
+ synchronized (this) {
+ assert worker == null;
+
+ worker = moveTo;
+
+ if (pendingStateChanges != null) {
+ moveTo.offer(pendingStateChanges);
+
+ pendingStateChanges = null;
+ }
+ }
+ }
/**
* Adds write future at the front of the queue without acquiring back pressure semaphore.
http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index d523aab..839bd77 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -147,7 +147,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
/**
*
*/
- static class DummyRunnable implements IgniteRunnable {
+ private static class DummyRunnable implements IgniteRunnable {
/** {@inheritDoc} */
@Override public void run() {
// No-op.
[06/12] ignite git commit: conn
Posted by sb...@apache.org.
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/79ca4aab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/79ca4aab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/79ca4aab
Branch: refs/heads/ignite-comm-balance
Commit: 79ca4aabab4e9d5f69271fbcd6885299c37cb9bf
Parents: 04a4d45
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 11:51:38 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 11:51:38 2016 +0300
----------------------------------------------------------------------
...GridTcpCommunicationSpiRecoverySelfTest.java | 21 +++++++++++++++++---
1 file changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/79ca4aab/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index fbbc5de..3d33fff 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -327,7 +327,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
- ses1.resumeReads().get();
+ try {
+ ses1.resumeReads().get();
+ }
+ catch (IgniteCheckedException ignore) {
+ // Can fail is ses1 was closed.
+ }
for (int j = 0; j < 100; j++) {
spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
@@ -437,7 +442,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
- ses1.resumeReads().get();
+ try {
+ ses1.resumeReads().get();
+ }
+ catch (IgniteCheckedException ignore) {
+ // Can fail is ses1 was closed.
+ }
// Wait when session is closed, then try to open new connection from node1.
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@@ -554,7 +564,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
- ses1.resumeReads().get();
+ try {
+ ses1.resumeReads().get();
+ }
+ catch (IgniteCheckedException ignore) {
+ // Can fail is ses1 was closed.
+ }
sndFut.get();
[09/12] ignite git commit: conn
Posted by sb...@apache.org.
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/831aa16d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/831aa16d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/831aa16d
Branch: refs/heads/ignite-comm-balance
Commit: 831aa16dcdad7501ef9c58f2b3e7a4e191e5452b
Parents: 79ca4aab
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 14:35:53 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 14:35:53 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/GridTcpSpiForwardingSelfTest.java | 13 +++++++------
...mmunicationSpiRecoveryFailureDetectionSelfTest.java | 1 +
2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/831aa16d/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index deda313..5ca8f26 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -69,7 +69,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
private static final int commExtPort2 = 20100;
/** */
- private AddressResolver resolver;
+ private AddressResolver rslvr;
/** */
private boolean ipFinderUseLocPorts;
@@ -127,12 +127,13 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
commSpi.setLocalPort(commLocPort);
commSpi.setLocalPortRange(1);
commSpi.setSharedMemoryPort(-1);
+ commSpi.setConnectionsPerNode(1);
cfg.setCommunicationSpi(commSpi);
- assert resolver != null;
+ assert rslvr != null;
- cfg.setAddressResolver(resolver);
+ cfg.setAddressResolver(rslvr);
return cfg;
}
@@ -148,7 +149,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
map.put(new InetSocketAddress("127.0.0.1", locPort2), F.asList(new InetSocketAddress("127.0.0.1", extPort2)));
map.put(new InetSocketAddress("127.0.0.1", commLocPort2), F.asList(new InetSocketAddress("127.0.0.1", commExtPort2)));
- resolver = new AddressResolver() {
+ rslvr = new AddressResolver() {
@Override public Collection<InetSocketAddress> getExternalAddresses(InetSocketAddress addr) {
return map.get(addr);
}
@@ -168,7 +169,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
map.put("127.0.0.1:" + locPort2, "127.0.0.1:" + extPort2);
map.put("127.0.0.1:" + commLocPort2, "127.0.0.1:" + commExtPort2);
- resolver = new BasicAddressResolver(map);
+ rslvr = new BasicAddressResolver(map);
doTestForward();
}
@@ -181,7 +182,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
map.put("127.0.0.1", "127.0.0.1");
- resolver = new BasicAddressResolver(map);
+ rslvr = new BasicAddressResolver(map);
ipFinderUseLocPorts = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/831aa16d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
index 95c9e40..b1aa119 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
@@ -33,6 +33,7 @@ public class GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest extends Gri
spi.setAckSendThreshold(5);
spi.setSocketSendBuffer(512);
spi.setSocketReceiveBuffer(512);
+ spi.setConnectionsPerNode(1);
return spi;
}
[04/12] ignite git commit: conn
Posted by sb...@apache.org.
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b0ffee0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b0ffee0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b0ffee0
Branch: refs/heads/ignite-comm-balance
Commit: 3b0ffee055ed843616282f013daa9d0b982e13bf
Parents: c604e8c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 21 12:54:53 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 21 12:54:53 2016 +0300
----------------------------------------------------------------------
.../util/nio/GridSelectorNioSessionImpl.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 47 +++++++++++++++-----
.../IgniteCacheMessageWriteTimeoutTest.java | 13 ++++--
3 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index a680a33..88721ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -303,7 +303,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
outRecovery = (GridNioRecoveryDescriptor)val;
- outRecovery.connected();
+ outRecovery.onConnected();
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c9d9bf7..c131cf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -424,13 +424,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (msg instanceof NodeIdMessage) {
sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
- connKey = new ConnectionKey(sndId, 0);
+ connKey = new ConnectionKey(sndId, 0, -1);
}
else {
assert msg instanceof HandshakeMessage : msg;
+ HandshakeMessage msg0 = (HandshakeMessage)msg;
+
sndId = ((HandshakeMessage)msg).nodeId();
- connKey = new ConnectionKey(sndId, ((HandshakeMessage)msg).connectionIndex());
+ connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
}
if (log.isDebugEnabled())
@@ -470,8 +472,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (reserve)
connectedNew(recoveryDesc, ses, true);
else {
- if (c.failed)
- ses.close();
+ if (c.failed) {
+ ses.send(new RecoveryLastReceivedMessage(-1));
+
+ for (GridNioSession ses0 : nioSrvr.sessions()) {
+ ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+ if (ses0.accepted() && key0 != null &&
+ key0.nodeId().equals(connKey.nodeId()) &&
+ key0.connectionIndex() == connKey.connectionIndex() &&
+ key0.connectCount() < connKey.connectCount())
+ ses0.close();
+ }
+ }
}
}
else {
@@ -2369,7 +2382,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// Do not allow concurrent connects.
GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
- ConnectionKey connKey = new ConnectionKey(nodeId, connIdx);
+ ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
@@ -2705,7 +2718,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"(node left topology): " + node);
}
- ConnectionKey connKey = new ConnectionKey(node.id(), connIdx);
+ ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
@@ -3097,8 +3110,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
rcvCnt = buf.getLong(1);
}
- if (log.isDebugEnabled())
- log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+ // if (log.isDebugEnabled())
+ log.info("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
if (rcvCnt == -1) {
if (log.isDebugEnabled())
@@ -3487,7 +3500,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridNioRecoveryDescriptor recovery = null;
if (!useMultipleConnections(node) && client instanceof GridTcpNioCommunicationClient) {
- recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex()));
+ recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
@@ -3508,7 +3521,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (idleTime >= idleConnTimeout) {
if (recovery == null && useMultipleConnections(node))
- recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex()));
+ recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
if (recovery != null &&
recovery.nodeAlive(getSpiContext().node(nodeId)) &&
@@ -4273,13 +4286,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private final int idx;
+ /** */
+ private final long connCnt;
+
/**
* @param nodeId Node ID.
* @param idx Connection index.
+ * @param connCnt Connection counter (set only for incoming connections).
*/
- ConnectionKey(UUID nodeId, int idx) {
+ ConnectionKey(UUID nodeId, int idx, long connCnt) {
this.nodeId = nodeId;
this.idx = idx;
+ this.connCnt = connCnt;
+ }
+
+ /**
+ * @return Connection counter.
+ */
+ long connectCount() {
+ return connCnt;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index 5b51af8..0dd4079 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -66,15 +66,20 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
super.afterTest();
}
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10 * 60_000;
+ }
+
/**
* @throws Exception If failed.
*/
public void testMessageQueueLimit() throws Exception {
- startGridsMultiThreaded(3);
-
- for (int i = 0; i < 15; i++) {
+ for (int i = 0; i < 3; i++) {
log.info("Iteration: " + i);
+ startGridsMultiThreaded(3);
+
IgniteInternalFuture<?> fut1 = startJobThreads(50);
U.sleep(100);
@@ -83,6 +88,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
fut1.get();
fut2.get();
+
+ stopAllGrids();
}
}
[03/12] ignite git commit: conn
Posted by sb...@apache.org.
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c604e8cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c604e8cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c604e8cb
Branch: refs/heads/ignite-comm-balance
Commit: c604e8cb291bae294bf97d8eb13fc16b8cf8a12e
Parents: bb465cc
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 21 11:04:40 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 21 11:04:40 2016 +0300
----------------------------------------------------------------------
.../GridTcpCommunicationSpiAbstractTest.java | 28 ++++++++++++--
...mmunicationSpiConcurrentConnectSelfTest.java | 40 ++++++++++++++++++--
2 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c604e8cb/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 076724d..3c4fea0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -90,16 +90,36 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
super.afterTest();
for (CommunicationSpi spi : spis.values()) {
- ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
+ ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients");
+
+ for (int i = 0; i < 20; i++) {
+ GridCommunicationClient client0 = null;
+
+ for (GridCommunicationClient[] clients0 : clients.values()) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null) {
+ client0 = client;
+
+ break;
+ }
+ }
+
+ if (client0 != null)
+ break;
+ }
+
+ if (client0 == null)
+ return;
- for (int i = 0; i < 20 && !clients.isEmpty(); i++) {
info("Check failed for SPI [grid=" +
- GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + ", spi=" + spi + ']');
+ GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") +
+ ", client=" + client0 +
+ ", spi=" + spi + ']');
U.sleep(1000);
}
- assert clients.isEmpty() : "Clients: " + clients;
+ fail("Failed to wait when clients are closed.");
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c604e8cb/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index bd66319..ed047fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -253,7 +254,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
final AtomicInteger idx = new AtomicInteger();
try {
- GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ final Callable<Void> c = new Callable<Void>() {
@Override public Void call() throws Exception {
int idx0 = idx.getAndIncrement();
@@ -279,7 +280,40 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
return null;
}
- }, threads, "test");
+ };
+
+ List<Thread> threadsList = new ArrayList<>();
+
+ final AtomicBoolean fail = new AtomicBoolean();
+
+ final AtomicLong tId = new AtomicLong();
+
+ for (int t = 0; t < threads; t++) {
+ Thread t0 = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ c.call();
+ }
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
+
+ fail.set(true);
+ }
+ }
+ }) {
+ @Override public long getId() {
+ // Override getId to use all connections.
+ return tId.getAndIncrement();
+ }
+ };
+
+ threadsList.add(t0);
+
+ t0.start();
+ }
+
+ for (Thread t0 : threadsList)
+ t0.join();
assertTrue(latch.await(10, TimeUnit.SECONDS));
@@ -294,7 +328,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
@Override public boolean apply() {
Collection sessions = U.field(srv, "sessions");
- return sessions.size() == 2;
+ return sessions.size() == 2 * connectionsPerNode;
}
}, 5000);
[10/12] ignite git commit: conn
Posted by sb...@apache.org.
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8d217413
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8d217413
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8d217413
Branch: refs/heads/ignite-comm-balance
Commit: 8d2174135a5fb5fffe6d7e695228862241b4fcef
Parents: 831aa16
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 14:41:01 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 15:28:23 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 ++
.../ignite/internal/IgniteSlowClientDetectionSelfTest.java | 1 +
.../communication/IgniteVariousConnectionNumberTest.java | 4 ++++
3 files changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d217413/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 639e23d..784b081 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3947,6 +3947,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
*
*/
+ @SuppressWarnings("PublicInnerClass")
public static class HandshakeMessage2 extends HandshakeMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -3969,6 +3970,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
super(nodeId, connectCnt, rcvCnt);
+
this.connIdx = connIdx;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d217413/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 760313b..5721887 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -75,6 +75,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
commSpi.setSlowClientQueueLimit(50);
commSpi.setSharedMemoryPort(-1);
commSpi.setIdleConnectionTimeout(300_000);
+ commSpi.setConnectionsPerNode(1);
cfg.setCommunicationSpi(commSpi);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d217413/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
index 360eb8d..00a25d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
@@ -115,6 +115,10 @@ public class IgniteVariousConnectionNumberTest extends GridCommonAbstractTest {
int idx = ThreadLocalRandom.current().nextInt(NODES);
+ Ignite node = ignite(idx);
+
+ client = node.configuration().isClientMode();
+
stopGrid(idx);
startGrid(idx);
[11/12] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-comm-opts2' into ignite-comm-balance
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-comm-opts2' into ignite-comm-balance
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9c87e2c7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9c87e2c7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9c87e2c7
Branch: refs/heads/ignite-comm-balance
Commit: 9c87e2c7b4baf81625e89d58d38e90de67598993
Parents: 81832e1 8d21741
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 16:14:49 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 16:14:49 2016 +0300
----------------------------------------------------------------------
.../spi/communication/tcp/TcpCommunicationSpi.java | 2 ++
.../internal/IgniteSlowClientDetectionSelfTest.java | 1 +
.../IgniteVariousConnectionNumberTest.java | 4 ++++
.../ignite/spi/GridTcpSpiForwardingSelfTest.java | 13 +++++++------
...mmunicationSpiRecoveryFailureDetectionSelfTest.java | 1 +
5 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
[02/12] ignite git commit: conn
Posted by sb...@apache.org.
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb465cc9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb465cc9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb465cc9
Branch: refs/heads/ignite-comm-balance
Commit: bb465cc9c4a8960fa218c6a1ce2b806ea24a11b3
Parents: c1d2436
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 21 10:18:45 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 21 10:18:45 2016 +0300
----------------------------------------------------------------------
.../internal/managers/communication/GridIoMessageFactory.java | 2 +-
.../apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb465cc9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 908543c..711c03f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -755,7 +755,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..119] [124] - this
+ // [-3..119] [124-125] - this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
default:
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb465cc9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index ee30420..c9d9bf7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -334,7 +334,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
public static final byte HANDSHAKE_MSG_TYPE = -3;
/** */
- public static final byte HANDSHAKE_MSG_TYPE2 = -4;
+ public static final byte HANDSHAKE_MSG_TYPE2 = 125;
/** */
private ConnectGateway connectGate;
[12/12] ignite git commit: conn
Posted by sb...@apache.org.
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a4d8196
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a4d8196
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a4d8196
Branch: refs/heads/ignite-comm-balance
Commit: 6a4d81965a78dd3f47bea3b33f823c62e994dd9a
Parents: 9c87e2c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 17:37:05 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 17:42:27 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/util/nio/GridNioServer.java | 34 ++-
.../communication/tcp/TcpCommunicationSpi.java | 4 +-
.../IgniteCommunicationBalanceTest.java | 215 +++++++++++++++----
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
4 files changed, 206 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 2d5cc64..7352b5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -47,6 +47,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -92,6 +93,9 @@ public class GridNioServer<T> {
/** */
public static final String IGNITE_NIO_SES_BALANCER_CLASS_NAME = "IGNITE_NIO_SES_BALANCER_CLASS_NAME";
+ /** */
+ public static final String IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD = "IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD";
+
/** Default session write timeout. */
public static final int DFLT_SES_WRITE_TIMEOUT = 5000;
@@ -215,10 +219,10 @@ public class GridNioServer<T> {
private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
/** */
- private volatile long writerMoveCnt;
+ private final AtomicLong readerMoveCnt = new AtomicLong();
/** */
- private volatile long readerMoveCnt;
+ private final AtomicLong writerMoveCnt = new AtomicLong();
/** */
private final Balancer balancer;
@@ -361,6 +365,14 @@ public class GridNioServer<T> {
this.balancer = balancer0;
}
+ public long readerMoveCount() {
+ return readerMoveCnt.get();
+ }
+
+ public long writerMoveCount() {
+ return writerMoveCnt.get();
+ }
+
/**
* @return Configured port.
*/
@@ -1505,6 +1517,11 @@ public class GridNioServer<T> {
ses.finishMoveSession(this);
+ if (idx % 2 == 0)
+ readerMoveCnt.incrementAndGet();
+ else
+ writerMoveCnt.incrementAndGet();
+
SelectionKey key = f.movedSocketChannel().register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE,
ses);
@@ -2948,10 +2965,13 @@ public class GridNioServer<T> {
/** */
private long lastBalance;
+ /** */
+ private final long balancePeriod = IgniteSystemProperties.getLong(IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, 5000);
+
/**
* @param srv Server.
*/
- public SizeBasedBalancer(GridNioServer<?> srv) {
+ SizeBasedBalancer(GridNioServer<?> srv) {
this.srv = srv;
log = srv.log;
@@ -2961,13 +2981,13 @@ public class GridNioServer<T> {
@Override public void balance() {
long now = U.currentTimeMillis();
- if (lastBalance + 5000 < now) {
+ if (lastBalance + balancePeriod < now) {
lastBalance = now;
long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1;
int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1;
- boolean print = Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4");
+ boolean print = false;//Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4");
List<GridNioServer.AbstractNioClientWorker> clientWorkers = (List)srv.clientWorkers;
@@ -3051,7 +3071,7 @@ public class GridNioServer<T> {
log.info("Will move session to less loaded writer [diff=" + sentDiff + ", ses=" + ses +
", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
- srv.writerMoveCnt++;
+ srv.writerMoveCnt.incrementAndGet();
clientWorkers.get(maxSentIdx).offer(new SessionMoveFuture(ses, minSentIdx));
}
@@ -3093,7 +3113,7 @@ public class GridNioServer<T> {
log.info("Will move session to less loaded reader [diff=" + rcvdDiff + ", ses=" + ses +
", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
- srv.readerMoveCnt++;
+ srv.readerMoveCnt.incrementAndGet();
clientWorkers.get(maxRcvdIdx).offer(new SessionMoveFuture(ses, minRcvdIdx));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 784b081..fd9985e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3110,8 +3110,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
rcvCnt = buf.getLong(1);
}
- // if (log.isDebugEnabled())
- log.info("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+ if (log.isDebugEnabled())
+ log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
if (rcvCnt == -1) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index 839bd77..86d43e7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -22,12 +22,19 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -58,6 +65,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
TcpCommunicationSpi commSpi = ((TcpCommunicationSpi)cfg.getCommunicationSpi());
commSpi.setSharedMemoryPort(-1);
+ commSpi.setConnectionsPerNode(1);
if (selectors > 0)
commSpi.setSelectorsCount(selectors);
@@ -79,45 +87,157 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testBalance() throws Exception {
- selectors = 4;
+ public void testBalance1() throws Exception {
+ System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "500");
- startGrid(0);
+ try {
+ selectors = 4;
- client = true;
+ startGridsMultiThreaded(4);
- Ignite client = startGrid(4);
+ client = true;
- startGridsMultiThreaded(1, 3);
+ Ignite client = startGrid(4);
- for (int i = 0; i < 4; i++) {
- ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
+ for (int i = 0; i < 4; i++) {
+ ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
- client.compute(client.cluster().forNode(node)).run(new DummyRunnable());
+ client.compute(client.cluster().forNode(node)).run(new DummyRunnable(null));
+ }
+
+ waitNioBalanceStop(client, 30_000);
+
+ final GridNioServer srv = GridTestUtils.getFieldValue(client.configuration().getCommunicationSpi(), "nioSrvr");
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ long readMoveCnt1 = srv.readerMoveCount();
+ long writeMoveCnt1 = srv.writerMoveCount();
+
+ for (int iter = 0; iter < 10; iter++) {
+ log.info("Iteration: " + iter);
+
+ int nodeIdx = rnd.nextInt(4);
+
+ ClusterNode node = client.cluster().node(ignite(nodeIdx).cluster().localNode().id());
+
+ IgniteCompute compute = client.compute(client.cluster().forNode(node));
+
+ for (int i = 0; i < 10_000; i++)
+ compute.run(new DummyRunnable(null));
+
+ final long readMoveCnt = readMoveCnt1;
+ final long writeMoveCnt = writeMoveCnt1;
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt;
+ }
+ }, 10_000);
+
+ waitNioBalanceStop(client, 30_000);
+
+ long readMoveCnt2 = srv.readerMoveCount();
+ long writeMoveCnt2 = srv.writerMoveCount();
+
+ assertTrue(readMoveCnt2 > readMoveCnt1);
+ assertTrue(writeMoveCnt2 > writeMoveCnt1);
+
+ readMoveCnt1 = readMoveCnt2;
+ writeMoveCnt1 = writeMoveCnt2;
+ }
+
+ for (Ignite node : G.allGrids())
+ waitNioBalanceStop(node, 10_000);
+ }
+ finally {
+ System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "");
}
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBalance2() throws Exception {
+ System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "500");
+
+ try {
+ startGridsMultiThreaded(5);
+
+ client = true;
+
+ startGridsMultiThreaded(5, 5);
+
+ for (int i = 0; i < 20; i++) {
+ log.info("Iteration: " + i);
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Ignite node = ignite(idx.incrementAndGet() % 10);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
-// ThreadLocalRandom rnd = ThreadLocalRandom.current();
-//
-// for (int iter = 0; iter < 10; iter++) {
-// log.info("Iteration: " + iter);
-//
-// int nodeIdx = rnd.nextInt(4);
-//
-// ClusterNode node = client.cluster().node(ignite(nodeIdx).cluster().localNode().id());
-//
-// for (int i = 0; i < 10_000; i++)
-// client.compute(client.cluster().forNode(node)).run(new DummyRunnable());
-//
-// U.sleep(5000);
-// }
-
- while (true) {
- ((IgniteKernal) client).dumpDebugInfo();
-
- Thread.sleep(5000);
+ int msgs = rnd.nextInt(1000);
+
+ for (int i = 0; i < msgs; i++) {
+ int sndTo = rnd.nextInt(10);
+
+ ClusterNode sntToNode = node.cluster().node(ignite(sndTo).cluster().localNode().id());
+
+ IgniteCompute compute = node.compute(node.cluster().forNode(sntToNode));
+
+ compute.run(new DummyRunnable(new byte[rnd.nextInt(1024)]));
+ }
+
+ return null;
+ }
+ }, 30, "test-thread");
+
+ for (Ignite node : G.allGrids())
+ waitNioBalanceStop(node, 10_000);
+ }
+ }
+ finally {
+ System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "");
}
+ }
+
+ /**
+ * @param node Node.
+ * @param timeout Timeout.
+ * @throws Exception If failed.
+ */
+ private void waitNioBalanceStop(Ignite node, long timeout) throws Exception {
+ TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi();
+
+ final GridNioServer srv = GridTestUtils.getFieldValue(spi, "nioSrvr");
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+ @Override public boolean applyx() throws IgniteCheckedException {
+ long readerMovCnt1 = srv.readerMoveCount();
+ long writerMovCnt1 = srv.writerMoveCount();
+
+ U.sleep(2000);
- //Thread.sleep(Long.MAX_VALUE);
+ long readerMovCnt2 = srv.readerMoveCount();
+ long writerMovCnt2 = srv.writerMoveCount();
+
+ if (readerMovCnt1 != readerMovCnt2) {
+ log.info("Readers balance is in progress [cnt1=" + readerMovCnt1 + ", cnt2=" + readerMovCnt2 + ']');
+
+ return false;
+ }
+ if (writerMovCnt1 != writerMovCnt2) {
+ log.info("Writers balance is in progress [cnt1=" + writerMovCnt1 + ", cnt2=" + writerMovCnt2 + ']');
+
+ return false;
+ }
+
+ return true;
+ }
+ }, timeout));
}
/**
@@ -126,28 +246,43 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
public void testRandomBalance() throws Exception {
System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_CLASS_NAME, TestBalancer.class.getName());
- final int NODES = 10;
+ try {
+ final int NODES = 10;
- startGridsMultiThreaded(NODES);
+ startGridsMultiThreaded(NODES);
- final long stopTime = System.currentTimeMillis() + 60_000;
+ final long stopTime = System.currentTimeMillis() + 60_000;
- GridTestUtils.runMultiThreaded(new Callable<Object>() {
- @Override public Object call() throws Exception {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- while (System.currentTimeMillis() < stopTime)
- ignite(rnd.nextInt(NODES)).compute().broadcast(new DummyRunnable());
+ while (System.currentTimeMillis() < stopTime)
+ ignite(rnd.nextInt(NODES)).compute().broadcast(new DummyRunnable(null));
- return null;
- }
- }, 20, "test-thread");
+ return null;
+ }
+ }, 20, "test-thread");
+ }
+ finally {
+ System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_CLASS_NAME, null);
+ }
}
/**
*
*/
private static class DummyRunnable implements IgniteRunnable {
+ /** */
+ private byte[] data;
+
+ /**
+ * @param data Data.
+ */
+ public DummyRunnable(byte[] data) {
+ this.data = data;
+ }
+
/** {@inheritDoc} */
@Override public void run() {
// No-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 3a0d1ee..5b24a13 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -37,6 +37,7 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreOptimizedMarshallerS
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
+import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest;
@@ -321,6 +322,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(CacheTxFastFinishTest.class);
suite.addTestSuite(IgniteVariousConnectionNumberTest.class);
+ suite.addTestSuite(IgniteCommunicationBalanceTest.class);
return suite;
}
[05/12] ignite git commit: conn
Posted by sb...@apache.org.
conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04a4d458
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04a4d458
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04a4d458
Branch: refs/heads/ignite-comm-balance
Commit: 04a4d4584ecdb9e44a1107921b640aa57bcc6cbb
Parents: 3b0ffee
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 10:21:29 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 10:21:29 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/communication/tcp/TcpCommunicationSpi.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/04a4d458/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c131cf2..639e23d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -315,7 +315,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
/** Default connections per node. */
- public static final int DFLT_CONN_PER_NODE = 1;
+ public static final int DFLT_CONN_PER_NODE = 2;
/** No-op runnable. */
private static final IgniteRunnable NOOP = new IgniteRunnable() {
@@ -1132,7 +1132,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* TODO
*
- * @param maxConnectionsPerNode
+ * @param maxConnectionsPerNode Number of connections per node.
*/
public void setConnectionsPerNode(int maxConnectionsPerNode) {
this.connectionsPerNode = maxConnectionsPerNode;
@@ -1141,7 +1141,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* TODO
*
- * @return
+ * @return Number of connections per node.
*/
public int getConnectionsPerNode() {
return connectionsPerNode;
[07/12] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-comm-opts2' into ignite-comm-balance
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-comm-opts2' into ignite-comm-balance
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ffd654a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ffd654a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ffd654a7
Branch: refs/heads/ignite-comm-balance
Commit: ffd654a777780e60f27fc3a2ac5d590a45123c87
Parents: 03e54ac 79ca4aab
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 12:02:47 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 12:02:47 2016 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 2 +-
.../util/nio/GridSelectorNioSessionImpl.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 55 ++++++++++++++------
.../IgniteCacheMessageWriteTimeoutTest.java | 13 +++--
.../GridTcpCommunicationSpiAbstractTest.java | 28 ++++++++--
...mmunicationSpiConcurrentConnectSelfTest.java | 40 ++++++++++++--
...GridTcpCommunicationSpiRecoverySelfTest.java | 21 ++++++--
7 files changed, 131 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ffd654a7/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------