You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/28 09:02:45 UTC

ignite git commit: balance

Repository: ignite
Updated Branches:
  refs/heads/ignite-1.8.2-balance 7f4d15329 -> 750e8236a


balance


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/750e8236
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/750e8236
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/750e8236

Branch: refs/heads/ignite-1.8.2-balance
Commit: 750e8236a200c0f6794b407874103f89cdb837f5
Parents: 7f4d153
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 28 11:55:02 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 28 11:55:02 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java | 56 ++++++++++++++++----
 .../communication/tcp/TcpCommunicationSpi.java  |  1 +
 .../IgniteCommunicationBalanceTest.java         |  7 +--
 3 files changed, 52 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/750e8236/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 91188da..55afdfc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -227,6 +227,9 @@ public class GridNioServer<T> {
     /** */
     private final IgniteRunnable balancer;
 
+    /** */
+    private final boolean readWriteSelectorsAssign;
+
     /**
      * @param addr Address.
      * @param port Port.
@@ -276,6 +279,7 @@ public class GridNioServer<T> {
         IgnitePredicate<Message> skipRecoveryPred,
         IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
         Balancer balancing,
+        boolean readWriteSelectorsAssign,
         GridNioFilter... filters
     ) throws IgniteCheckedException {
         if (port != -1)
@@ -300,6 +304,7 @@ public class GridNioServer<T> {
         this.sndQueueLimit = sndQueueLimit;
         this.msgQueueLsnr = msgQueueLsnr;
         this.selectorSpins = selectorSpins;
+        this.readWriteSelectorsAssign = readWriteSelectorsAssign;
 
         filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
 
@@ -840,21 +845,31 @@ public class GridNioServer<T> {
         int balanceIdx;
 
         if (workers > 1) {
-            if (req.accepted()) {
-                balanceIdx = readBalanceIdx;
+            if (readWriteSelectorsAssign) {
+                if (req.accepted()) {
+                    balanceIdx = readBalanceIdx;
 
-                readBalanceIdx += 2;
+                    readBalanceIdx += 2;
 
-                if (readBalanceIdx >= workers)
-                    readBalanceIdx = 0;
+                    if (readBalanceIdx >= workers)
+                        readBalanceIdx = 0;
+                }
+                else {
+                    balanceIdx = writeBalanceIdx;
+
+                    writeBalanceIdx += 2;
+
+                    if (writeBalanceIdx >= workers)
+                        writeBalanceIdx = 1;
+                }
             }
             else {
-                balanceIdx = writeBalanceIdx;
+                balanceIdx = readBalanceIdx;
 
-                writeBalanceIdx += 2;
+                readBalanceIdx++;
 
-                if (writeBalanceIdx >= workers)
-                    writeBalanceIdx = 1;
+                if (readBalanceIdx >= workers)
+                    readBalanceIdx = 0;
             }
         }
         else
@@ -3149,6 +3164,9 @@ public class GridNioServer<T> {
         /** NIO sessions balancing flag. */
         private Balancer balancing;
 
+        /** */
+        private boolean readWriteSelectorsAssign;
+
         /**
          * Finishes building the instance.
          *
@@ -3178,6 +3196,7 @@ public class GridNioServer<T> {
                 skipRecoveryPred,
                 msgQueueLsnr,
                 balancing,
+                readWriteSelectorsAssign,
                 filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
             );
 
@@ -3191,6 +3210,16 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param readWriteSelectorsAssign TODO
+         * @return This for chaining.
+         */
+        public Builder<T> readWriteSelectorsAssign(boolean readWriteSelectorsAssign) {
+            this.readWriteSelectorsAssign = readWriteSelectorsAssign;
+
+            return this;
+        }
+
+        /**
          * @param balancing NIO sessions balancing flag.
          * @return This for chaining.
          */
@@ -3610,6 +3639,9 @@ public class GridNioServer<T> {
                 long maxBytes0 = -1, minBytes0 = -1;
                 int maxBytesIdx = -1, minBytesIdx = -1;
 
+//                if (log.isDebugEnabled())
+//                    log.debug("Start balance, workers: " + clientWorkers.size());
+
                 for (int i = 0; i < clientWorkers.size(); i++) {
                     GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
 
@@ -3617,6 +3649,12 @@ public class GridNioServer<T> {
 
                     long bytes0 = worker.bytesRcvd0 + worker.bytesSent0;
 
+//                    if (log.isDebugEnabled())
+//                        log.debug("Worker [id=" + i +
+//                            ", sesCnt=" + sesCnt +
+//                            ", rcvd=" + worker.bytesRcvd0 +
+//                            ", sent=" + worker.bytesSent0 + ']');
+
                     if ((maxBytes0 == -1 || bytes0 > maxBytes0) && bytes0 > 0 && sesCnt > 1) {
                         maxBytes0 = bytes0;
                         maxBytesIdx = i;

http://git-wip-us.apache.org/repos/asf/ignite/blob/750e8236/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 0a8dfac..131facb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2058,6 +2058,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .skipRecoveryPredicate(skipRecoveryPred)
                         .messageQueueSizeListener(queueSizeMonitor)
                         .balancing(usePairedConnections ? GridNioServer.Balancer.SIZE1 : GridNioServer.Balancer.SIZE2)
+                        .readWriteSelectorsAssign(usePairedConnections)
                         .build();
 
                 boundTcpPort = port;

http://git-wip-us.apache.org/repos/asf/ignite/blob/750e8236/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index 19482be..e88af56 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -63,6 +63,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
 
         commSpi.setSharedMemoryPort(-1);
         commSpi.setConnectionsPerNode(connectionsPerNode());
+        commSpi.setUsePairedConnections(false);
 
         if (selectors > 0)
             commSpi.setSelectorsCount(selectors);
@@ -95,9 +96,9 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
         System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "5000");
 
         try {
-            selectors = 4;
+            selectors = 2;
 
-            final int SRVS = 4;
+            final int SRVS = 6;
 
             startGridsMultiThreaded(SRVS);
 
@@ -105,7 +106,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
 
             final Ignite client = startGrid(SRVS);
 
-            for (int i = 0; i < 4; i++) {
+            for (int i = 0; i < SRVS; i++) {
                 ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
 
                 client.compute(client.cluster().forNode(node)).call(new DummyCallable(null));