You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/28 09:02:45 UTC
ignite git commit: balance
Repository: ignite
Updated Branches:
refs/heads/ignite-1.8.2-balance 7f4d15329 -> 750e8236a
balance
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/750e8236
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/750e8236
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/750e8236
Branch: refs/heads/ignite-1.8.2-balance
Commit: 750e8236a200c0f6794b407874103f89cdb837f5
Parents: 7f4d153
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 28 11:55:02 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 28 11:55:02 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/util/nio/GridNioServer.java | 56 ++++++++++++++++----
.../communication/tcp/TcpCommunicationSpi.java | 1 +
.../IgniteCommunicationBalanceTest.java | 7 +--
3 files changed, 52 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/750e8236/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 91188da..55afdfc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -227,6 +227,9 @@ public class GridNioServer<T> {
/** */
private final IgniteRunnable balancer;
+ /** */
+ private final boolean readWriteSelectorsAssign;
+
/**
* @param addr Address.
* @param port Port.
@@ -276,6 +279,7 @@ public class GridNioServer<T> {
IgnitePredicate<Message> skipRecoveryPred,
IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
Balancer balancing,
+ boolean readWriteSelectorsAssign,
GridNioFilter... filters
) throws IgniteCheckedException {
if (port != -1)
@@ -300,6 +304,7 @@ public class GridNioServer<T> {
this.sndQueueLimit = sndQueueLimit;
this.msgQueueLsnr = msgQueueLsnr;
this.selectorSpins = selectorSpins;
+ this.readWriteSelectorsAssign = readWriteSelectorsAssign;
filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
@@ -840,21 +845,31 @@ public class GridNioServer<T> {
int balanceIdx;
if (workers > 1) {
- if (req.accepted()) {
- balanceIdx = readBalanceIdx;
+ if (readWriteSelectorsAssign) {
+ if (req.accepted()) {
+ balanceIdx = readBalanceIdx;
- readBalanceIdx += 2;
+ readBalanceIdx += 2;
- if (readBalanceIdx >= workers)
- readBalanceIdx = 0;
+ if (readBalanceIdx >= workers)
+ readBalanceIdx = 0;
+ }
+ else {
+ balanceIdx = writeBalanceIdx;
+
+ writeBalanceIdx += 2;
+
+ if (writeBalanceIdx >= workers)
+ writeBalanceIdx = 1;
+ }
}
else {
- balanceIdx = writeBalanceIdx;
+ balanceIdx = readBalanceIdx;
- writeBalanceIdx += 2;
+ readBalanceIdx++;
- if (writeBalanceIdx >= workers)
- writeBalanceIdx = 1;
+ if (readBalanceIdx >= workers)
+ readBalanceIdx = 0;
}
}
else
@@ -3149,6 +3164,9 @@ public class GridNioServer<T> {
/** NIO sessions balancing flag. */
private Balancer balancing;
+ /** */
+ private boolean readWriteSelectorsAssign;
+
/**
* Finishes building the instance.
*
@@ -3178,6 +3196,7 @@ public class GridNioServer<T> {
skipRecoveryPred,
msgQueueLsnr,
balancing,
+ readWriteSelectorsAssign,
filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
);
@@ -3191,6 +3210,16 @@ public class GridNioServer<T> {
}
/**
+ * @param readWriteSelectorsAssign TODO
+ * @return This for chaining.
+ */
+ public Builder<T> readWriteSelectorsAssign(boolean readWriteSelectorsAssign) {
+ this.readWriteSelectorsAssign = readWriteSelectorsAssign;
+
+ return this;
+ }
+
+ /**
* @param balancing NIO sessions balancing flag.
* @return This for chaining.
*/
@@ -3610,6 +3639,9 @@ public class GridNioServer<T> {
long maxBytes0 = -1, minBytes0 = -1;
int maxBytesIdx = -1, minBytesIdx = -1;
+// if (log.isDebugEnabled())
+// log.debug("Start balance, workers: " + clientWorkers.size());
+
for (int i = 0; i < clientWorkers.size(); i++) {
GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
@@ -3617,6 +3649,12 @@ public class GridNioServer<T> {
long bytes0 = worker.bytesRcvd0 + worker.bytesSent0;
+// if (log.isDebugEnabled())
+// log.debug("Worker [id=" + i +
+// ", sesCnt=" + sesCnt +
+// ", rcvd=" + worker.bytesRcvd0 +
+// ", sent=" + worker.bytesSent0 + ']');
+
if ((maxBytes0 == -1 || bytes0 > maxBytes0) && bytes0 > 0 && sesCnt > 1) {
maxBytes0 = bytes0;
maxBytesIdx = i;
http://git-wip-us.apache.org/repos/asf/ignite/blob/750e8236/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 0a8dfac..131facb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2058,6 +2058,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.skipRecoveryPredicate(skipRecoveryPred)
.messageQueueSizeListener(queueSizeMonitor)
.balancing(usePairedConnections ? GridNioServer.Balancer.SIZE1 : GridNioServer.Balancer.SIZE2)
+ .readWriteSelectorsAssign(usePairedConnections)
.build();
boundTcpPort = port;
http://git-wip-us.apache.org/repos/asf/ignite/blob/750e8236/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index 19482be..e88af56 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -63,6 +63,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
commSpi.setSharedMemoryPort(-1);
commSpi.setConnectionsPerNode(connectionsPerNode());
+ commSpi.setUsePairedConnections(false);
if (selectors > 0)
commSpi.setSelectorsCount(selectors);
@@ -95,9 +96,9 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "5000");
try {
- selectors = 4;
+ selectors = 2;
- final int SRVS = 4;
+ final int SRVS = 6;
startGridsMultiThreaded(SRVS);
@@ -105,7 +106,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
final Ignite client = startGrid(SRVS);
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < SRVS; i++) {
ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
client.compute(client.cluster().forNode(node)).call(new DummyCallable(null));