You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/15 15:58:52 UTC
[3/5] ignite git commit: nio balance
nio balance
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc85af16
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc85af16
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc85af16
Branch: refs/heads/ignite-comm-balance
Commit: bc85af1680acca18f6d764e09ca2ecacb48c2319
Parents: 0b7ff82
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 15 15:12:50 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 15 15:12:50 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 12 +
.../ignite/internal/util/nio/GridNioServer.java | 361 ++++++++++++++++++-
.../internal/util/nio/GridNioSessionImpl.java | 50 +++
.../util/nio/GridSelectorNioSessionImpl.java | 9 +-
.../IgniteCommunicationBalanceTest.java | 118 ++++++
5 files changed, 536 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 93acc75..c7529bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -8307,6 +8307,18 @@ public abstract class IgniteUtils {
}
/**
+ * Gets absolute value for long. If argument is {@link Long#MIN_VALUE}, then {@code 0} is returned.
+ *
+ * @param i Argument.
+ * @return Absolute value.
+ */
+ public static long safeAbs(long i) {
+ i = Math.abs(i);
+
+ return i < 0 ? 0 : i;
+ }
+
+ /**
* Gets wrapper class for a primitive type.
*
* @param cls Class. If {@code null}, method is no-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index f18615d..60db1fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -107,9 +107,6 @@ public class GridNioServer<T> {
private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
/** */
- private static final boolean NIO_SES_BALANCE_ENABLED = IgniteSystemProperties.getBoolean("IGNITE_NIO_SES_BALANCE_ENABLED", true);
-
- /** */
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
@@ -213,6 +210,15 @@ public class GridNioServer<T> {
/** Optional listener to monitor outbound message queue size. */
private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
+ /** */
+ private volatile long writerMoveCnt;
+
+ /** */
+ private volatile long readerMoveCnt;
+
+ /** */
+ private final Balancer balancer;
+
/**
* @param addr Address.
* @param port Port.
@@ -324,6 +330,22 @@ public class GridNioServer<T> {
this.writerFactory = writerFactory;
this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<Message>alwaysFalse();
+
+ boolean balanceEnabled = IgniteSystemProperties.getBoolean("IGNITE_NIO_SES_BALANCE_ENABLED", true);
+
+ Balancer balancer0 = null;
+
+ if (balanceEnabled) {
+ String balancerCls = IgniteSystemProperties.getString("IGNITE_NIO_SES_BALANCER_CLASS_NAME");
+
+ if (balancerCls != null) {
+
+ }
+ else
+ balancer0 = new SizeBasedBalancer(this);
+ }
+
+ this.balancer = balancer0;
}
/**
@@ -399,7 +421,10 @@ public class GridNioServer<T> {
NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE);
- clientWorkers.get(impl.selectorIndex()).offer(fut);
+ int idx = impl.selectorIndex(); // TODO
+
+ if (idx != -1)
+ clientWorkers.get(idx).offer(fut);
return fut;
}
@@ -459,9 +484,13 @@ public class GridNioServer<T> {
if (ses.removeFuture(fut))
fut.connectionClosed();
}
- else if (msgCnt == 1)
+ else if (msgCnt == 1) {
// Change from 0 to 1 means that worker thread should be waken up.
- clientWorkers.get(ses.selectorIndex()).offer(fut);
+ int idx = ses.selectorIndex();
+
+ if (idx != -1) // TODO revisit
+ clientWorkers.get(idx).offer(fut);
+ }
if (msgQueueLsnr != null)
msgQueueLsnr.apply(ses, msgCnt);
@@ -925,6 +954,8 @@ public class GridNioServer<T> {
metricsLsnr.onBytesReceived(cnt);
ses.bytesReceived(cnt);
+ ses.onBytesRead(cnt, readBuf.capacity());
+ onRead(cnt);
readBuf.flip();
@@ -1239,6 +1270,8 @@ public class GridNioServer<T> {
metricsLsnr.onBytesSent(cnt);
ses.bytesSent(cnt);
+ ses.onBytesWritten(cnt, buf.capacity());
+ onWrite(cnt);
}
else {
// For test purposes only (skipWrite is set to true in tests only).
@@ -1276,6 +1309,13 @@ public class GridNioServer<T> {
/** Worker index. */
private final int idx;
+ private volatile long bytesRcvd;
+ private volatile long bytesSent;
+ private volatile long bytesRcvd0;
+ private volatile long bytesSent0;
+
+ private final GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions0 = new GridConcurrentHashSet<>();
+
/**
* @param idx Index of this worker in server's array.
* @param gridName Grid name.
@@ -1400,6 +1440,40 @@ public class GridNioServer<T> {
break;
}
+ case MOVE: {
+ SessionMoveFuture f = (SessionMoveFuture)req;
+
+ GridSelectorNioSessionImpl ses = f.session();
+
+ if (idx == f.toIdx) {
+ ses.selectorIndex(idx);
+
+ sessions0.add(ses);
+
+ SelectionKey key = f.socketChannel().register(selector,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE, ses); // TODO what if reads were paused?
+
+ ses.key(key);
+ }
+ else {
+ assert ses.selectorIndex() == idx; // TODO replace with IF and ignore?
+
+ // Cleanup.
+ ses.selectorIndex(-1);
+ sessions0.remove(ses);
+
+ SelectionKey key = ses.key();
+
+ f.socketChannel((SocketChannel)key.channel());
+
+ key.cancel();
+
+ clientWorkers.get(f.toIndex()).offer(f);
+ }
+
+ break;
+ }
+
case REQUIRE_WRITE: {
//Just register write key.
SelectionKey key = req.session().key();
@@ -1467,6 +1541,10 @@ public class GridNioServer<T> {
sb.append(U.nl())
.append(">> Selector info [idx=").append(idx)
.append(", keysCnt=").append(keys.size())
+ .append(", bytesRcvd=").append(bytesRcvd)
+ .append(", bytesRcvd0=").append(bytesRcvd0)
+ .append(", bytesSent=").append(bytesSent)
+ .append(", bytesSent0=").append(bytesSent0)
.append("]").append(U.nl());
for (SelectionKey key : keys) {
@@ -1500,8 +1578,12 @@ public class GridNioServer<T> {
sb.append(", inRecoveryDesc=null");
sb.append(", bytesRcvd=").append(ses.bytesReceived())
+ .append(", bytesRcvd0=").append(ses.bytesReceived0())
.append(", bytesSent=").append(ses.bytesSent())
+ .append(", bytesSent0=").append(ses.bytesSent0())
.append(", opQueueSize=").append(ses.writeQueueSize())
+ .append(", writeStats=").append(Arrays.toString(ses.writeStats()))
+ .append(", readStats=").append(Arrays.toString(ses.readStats()))
.append(", msgWriter=").append(writer != null ? writer.toString() : "null")
.append(", msgReader=").append(reader != null ? reader.toString() : "null");
@@ -1764,6 +1846,7 @@ public class GridNioServer<T> {
resend(ses);
sessions.add(ses);
+ sessions0.add(ses);
try {
filterChain.onSessionOpened(ses);
@@ -1789,7 +1872,7 @@ public class GridNioServer<T> {
}
/**
- * Closes the ses and all associated resources, then notifies the listener.
+ * Closes the session and all associated resources, then notifies the listener.
*
* @param ses Session to be closed.
* @param e Exception to be passed to the listener, if any.
@@ -1806,12 +1889,10 @@ public class GridNioServer<T> {
}
sessions.remove(ses);
+ sessions0.remove(ses);
SelectionKey key = ses.key();
- // Shutdown input and output so that remote client will see correct socket close.
- Socket sock = ((SocketChannel)key.channel()).socket();
-
if (ses.setClosed()) {
ses.onClosed();
@@ -1823,6 +1904,9 @@ public class GridNioServer<T> {
((DirectBuffer)ses.readBuffer()).cleaner().clean();
}
+ // Shutdown input and output so that remote client will see correct socket close.
+ Socket sock = ((SocketChannel)key.channel()).socket();
+
try {
try {
sock.shutdownInput();
@@ -1906,6 +1990,24 @@ public class GridNioServer<T> {
* @throws IOException If write failed.
*/
protected abstract void processWrite(SelectionKey key) throws IOException;
+
+ protected void onRead(int cnt) { // TODO
+ bytesRcvd += cnt;
+ bytesRcvd0 += cnt;
+ }
+
+ protected void onWrite(int cnt) {
+ bytesSent += cnt;
+ bytesSent0 += cnt;
+ }
+
+ protected void reset0() {
+ bytesSent0 = 0;
+ bytesRcvd0 = 0;
+
+ for (GridSelectorNioSessionImpl ses : sessions0)
+ ses.reset0();
+ }
}
/**
@@ -1976,12 +2078,17 @@ public class GridNioServer<T> {
* @throws IgniteCheckedException If failed.
*/
private void accept() throws IgniteCheckedException {
+ long lastBalance = U.currentTimeMillis();
+
try {
while (!closed && selector.isOpen() && !Thread.currentThread().isInterrupted()) {
// Wake up every 2 seconds to check if closed.
if (selector.select(2000) > 0)
// Walk through the ready keys collection and process date requests.
processSelectedKeys(selector.selectedKeys());
+
+ if (balancer != null)
+ balancer.balance();
}
}
// Ignore this exception as thread interruption is equal to 'close' call.
@@ -2082,6 +2189,9 @@ public class GridNioServer<T> {
/** Register read key selection. */
REGISTER,
+ /** */
+ MOVE,
+
/** Register write key selection. */
REQUIRE_WRITE,
@@ -2107,7 +2217,7 @@ public class GridNioServer<T> {
/** Socket channel in register request. */
@GridToStringExclude
- private SocketChannel sockCh;
+ protected SocketChannel sockCh; // TODO to be fixed with proper hierarchy
/** Session to perform operation on. */
@GridToStringExclude
@@ -2249,14 +2359,14 @@ public class GridNioServer<T> {
/**
* @return Socket channel for register request.
*/
- private SocketChannel socketChannel() {
+ SocketChannel socketChannel() {
return sockCh;
}
/**
* @return Session for this change request.
*/
- private GridSelectorNioSessionImpl session() {
+ GridSelectorNioSessionImpl session() {
return ses;
}
@@ -2303,6 +2413,41 @@ public class GridNioServer<T> {
}
/**
+ *
+ */
+ private static class SessionMoveFuture<R> extends NioOperationFuture<R> {
+ /** */
+ private final int toIdx;
+
+ /**
+ * @param ses
+ * @param toIdx
+ */
+ public SessionMoveFuture(
+ GridSelectorNioSessionImpl ses,
+ int toIdx
+ ) {
+ super(ses, NioOperation.MOVE);
+
+ this.sockCh = sockCh;
+ this.toIdx = toIdx;
+ }
+
+ int toIndex() {
+ return toIdx;
+ }
+
+ void socketChannel(SocketChannel sockCh) {
+ this.sockCh = sockCh;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SessionMoveFuture.class, this, super.toString());
+ }
+ }
+
+ /**
* Filter forwarding messages from chain's head to this server.
*/
private class HeadFilter extends GridNioFilterAdapter {
@@ -2708,4 +2853,194 @@ public class GridNioServer<T> {
return this;
}
}
+
+ /**
+ *
+ */
+ public interface Balancer {
+ /**
+ *
+ */
+ void balance();
+ }
+
+ /**
+ *
+ */
+ private static class SizeBasedBalancer implements Balancer {
+ /** */
+ private final GridNioServer<?> srv;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private long lastBalance;
+
+ /**
+ * @param srv Server.
+ */
+ public SizeBasedBalancer(GridNioServer<?> srv) {
+ this.srv = srv;
+
+ log = srv.log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void balance() {
+ long now = U.currentTimeMillis();
+
+ if (lastBalance + 5000 < now) {
+ lastBalance = now;
+
+ long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1;
+ int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1;
+
+ boolean print = Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4");
+
+ List<GridNioServer.AbstractNioClientWorker> clientWorkers = (List)srv.clientWorkers;
+
+ for (int i = 0; i < clientWorkers.size(); i++) {
+ GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+ if ((i & 1) == 0) {
+ // Reader.
+ long bytesRcvd0 = worker.bytesRcvd0;
+
+ if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0 &&
+ worker.sessions0.size() > 1) {
+ maxRcvd0 = bytesRcvd0;
+ maxRcvdIdx = i;
+
+ continue;
+ }
+
+ if (minRcvd0 == -1 || bytesRcvd0 < minRcvd0) {
+ minRcvd0 = bytesRcvd0;
+ minRcvdIdx = i;
+ }
+ }
+ else {
+ // Writer.
+ long bytesSent0 = worker.bytesSent0;
+
+ if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0 > 0 &&
+ worker.sessions0.size() > 1) {
+ maxSent0 = bytesSent0;
+ maxSentIdx = i;
+
+ continue;
+ }
+
+ if (minSent0 == -1 || bytesSent0 < minSent0) {
+ minSent0 = bytesSent0;
+ minSentIdx = i;
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" + minSentIdx +
+ ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx +
+ ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx +
+ ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']');
+
+ if (print)
+ log.info("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" + minSentIdx +
+ ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx +
+ ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx +
+ ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']');
+
+ if (maxSent0 != -1 && minSent0 != -1) {
+ GridSelectorNioSessionImpl ses = null;
+
+ long sentDiff = maxSent0 - minSent0;
+ long delta = sentDiff;
+ double threshold = sentDiff * 0.9;
+
+ GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+ clientWorkers.get(maxSentIdx).sessions0;
+
+ for (GridSelectorNioSessionImpl ses0 : sessions) {
+ long bytesSent0 = ses0.bytesSent0();
+
+ if (bytesSent0 < threshold &&
+ (ses == null || delta > U.safeAbs(bytesSent0 - sentDiff / 2))) {
+ ses = ses0;
+ delta = U.safeAbs(bytesSent0 - sentDiff / 2);
+ }
+ }
+
+ if (ses != null) {
+ if (log.isDebugEnabled())
+ log.debug("Will move session to less loaded writer [ses=" + ses +
+ ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+
+ if (print)
+ log.info("Will move session to less loaded writer [diff=" + sentDiff + ", ses=" + ses +
+ ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+
+ srv.writerMoveCnt++;
+
+ clientWorkers.get(maxSentIdx).offer(new SessionMoveFuture(ses, minSentIdx));
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Unable to find session to move for writers.");
+
+ if (print)
+ log.info("Unable to find session to move for writers.");
+ }
+ }
+
+ if (maxRcvd0 != -1 && minRcvd0 != -1) {
+ GridSelectorNioSessionImpl ses = null;
+
+ long rcvdDiff = maxRcvd0 - minRcvd0;
+ long delta = rcvdDiff;
+ double threshold = rcvdDiff * 0.9;
+
+ GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+ clientWorkers.get(maxRcvdIdx).sessions0;
+
+ for (GridSelectorNioSessionImpl ses0 : sessions) {
+ long bytesRcvd0 = ses0.bytesReceived0();
+
+ if (bytesRcvd0 < threshold &&
+ (ses == null || delta > U.safeAbs(bytesRcvd0 - rcvdDiff / 2))) {
+ ses = ses0;
+ delta = U.safeAbs(bytesRcvd0 - rcvdDiff / 2);
+ }
+ }
+
+ if (ses != null) {
+ if (log.isDebugEnabled())
+ log.debug("Will move session to less loaded reader [ses=" + ses +
+ ", from=" + maxRcvdIdx + ", to=" + minRcvdIdx + ']');
+
+ if (print)
+ log.info("Will move session to less loaded reader [diff=" + rcvdDiff + ", ses=" + ses +
+ ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+
+ srv.readerMoveCnt++;
+
+ clientWorkers.get(maxRcvdIdx).offer(new SessionMoveFuture(ses, minRcvdIdx));
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Unable to find session to move for readers.");
+
+ if (print)
+ log.info("Unable to find session to move for readers.");
+ }
+ }
+
+ for (int i = 0; i < clientWorkers.size(); i++) {
+ GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+ worker.reset0();
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 53a624d..3f5d367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -45,12 +45,21 @@ public class GridNioSessionImpl implements GridNioSession {
/** Session close timestamp. */
private final AtomicLong closeTime = new AtomicLong();
+ private final long[] writesStat = new long[25];
+ private final long[] readsStat = new long[25];
+
/** Sent bytes counter. */
private volatile long bytesSent;
/** Received bytes counter. */
private volatile long bytesRcvd;
+ /** Sent bytes counter since last re-balancing. */
+ private volatile long bytesSent0;
+
+ /** Received bytes counter since last re-balancing. */
+ private volatile long bytesRcvd0;
+
/** Last send schedule timestamp. */
private volatile long sndSchedTime;
@@ -163,6 +172,19 @@ public class GridNioSessionImpl implements GridNioSession {
return bytesRcvd;
}
+ public long bytesSent0() {
+ return bytesSent0;
+ }
+
+ public long bytesReceived0() {
+ return bytesRcvd0;
+ }
+
+ public void reset0() {
+ bytesSent0 = 0;
+ bytesRcvd0 = 0;
+ }
+
/** {@inheritDoc} */
@Override public long createTime() {
return createTime;
@@ -240,10 +262,37 @@ public class GridNioSessionImpl implements GridNioSession {
*/
public void bytesSent(int cnt) {
bytesSent += cnt;
+ bytesSent0 += cnt;
lastSndTime = U.currentTimeMillis();
}
+ public void onBytesWritten(int cnt, int bufCap) {
+ int idx = (int)Math.floor(((cnt * 1.0) / bufCap) * writesStat.length);
+
+ if (idx >= writesStat.length)
+ idx = writesStat.length - 1;
+
+ writesStat[idx]++;
+ }
+
+ public void onBytesRead(int cnt, int bufCap) {
+ int idx = (int)Math.floor(((cnt * 1.0) / bufCap) * readsStat.length);
+
+ if (idx >= readsStat.length)
+ idx = readsStat.length - 1;
+
+ readsStat[idx]++;
+ }
+
+ public long[] readStats() {
+ return readsStat;
+ }
+
+ public long[] writeStats() {
+ return writesStat;
+ }
+
/**
* Adds given amount ob bytes to the received bytes counter.
* <p>
@@ -253,6 +302,7 @@ public class GridNioSessionImpl implements GridNioSession {
*/
public void bytesReceived(int cnt) {
bytesRcvd += cnt;
+ bytesRcvd0 += cnt;
lastRcvTime = U.currentTimeMillis();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index a680a33..8e5b93d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -44,7 +44,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
private SelectionKey key;
/** Worker index for server */
- private final int selectorIdx;
+ private volatile int selectorIdx;
/** Size counter. */
private final AtomicInteger queueSize = new AtomicInteger();
@@ -161,6 +161,13 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
}
/**
+ * @param selectorIdx Selector index.
+ */
+ void selectorIndex(int selectorIdx) {
+ this.selectorIdx = selectorIdx;
+ }
+
+ /**
* Adds write future at the front of the queue without acquiring back pressure semaphore.
*
* @param writeFut Write request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
new file mode 100644
index 0000000..30a6254
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.log4j.helpers.ThreadLocalMap;
+
+/**
+ *
+ */
+public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpCommunicationSpi commSpi = ((TcpCommunicationSpi)cfg.getCommunicationSpi());
+
+ commSpi.setSharedMemoryPort(-1);
+ commSpi.setSelectorsCount(4);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBalance() throws Exception {
+ startGrid(0);
+
+ client = true;
+
+ Ignite client = startGrid(4);
+
+ startGridsMultiThreaded(1, 3);
+
+ for (int i = 0; i < 4; i++) {
+ ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
+
+ client.compute(client.cluster().forNode(node)).run(new DummyRunnable());
+ }
+
+// ThreadLocalRandom rnd = ThreadLocalRandom.current();
+//
+// for (int iter = 0; iter < 10; iter++) {
+// log.info("Iteration: " + iter);
+//
+// int nodeIdx = rnd.nextInt(4);
+//
+// ClusterNode node = client.cluster().node(ignite(nodeIdx).cluster().localNode().id());
+//
+// for (int i = 0; i < 10_000; i++)
+// client.compute(client.cluster().forNode(node)).run(new DummyRunnable());
+//
+// U.sleep(5000);
+// }
+
+ while (true) {
+ ((IgniteKernal) client).dumpDebugInfo();
+
+ Thread.sleep(5000);
+ }
+
+ //Thread.sleep(Long.MAX_VALUE);
+ }
+
+ /**
+ *
+ */
+ static class DummyRunnable implements IgniteRunnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ // No-op.
+ }
+ }
+}