You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/27 10:25:38 UTC

ignite git commit: ignite-1.8.2: balance for pairedConnections=false

Repository: ignite
Updated Branches:
  refs/heads/ignite-1.8.2-balance [created] faac0887d


ignite-1.8.2: balance for pairedConnections=false


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/faac0887
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/faac0887
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/faac0887

Branch: refs/heads/ignite-1.8.2-balance
Commit: faac0887d5e84aee9d66b2a1761817439e43c84b
Parents: 8372e69
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 27 13:25:27 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 27 13:25:27 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java | 98 +++++++++++++++++++-
 .../IgniteCommunicationBalanceTest.java         |  5 +-
 2 files changed, 99 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/faac0887/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index bc1f173..11c1ea9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -362,7 +362,9 @@ public class GridNioServer<T> {
         if (balancing && balancePeriod > 0) {
             boolean rndBalance = IgniteSystemProperties.getBoolean(IGNITE_IO_BALANCE_RANDOM_BALANCE, false);
 
-            balancer0 = rndBalance ? new RandomBalancer() : new SizeBasedBalancer(balancePeriod);
+            balancer0 = rndBalance ? new RandomBalancer() : new SizeBasedBalancer2(balancePeriod);
+
+            log.info("Balancer: " + balancer0.getClass().getSimpleName());
         }
 
         this.balancer = balancer0;
@@ -3559,6 +3561,100 @@ public class GridNioServer<T> {
     }
 
     /**
+     *
+     */
+    private class SizeBasedBalancer2 implements IgniteRunnable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private long lastBalance;
+
+        /** */
+        private final long balancePeriod;
+
+        /**
+         * @param balancePeriod Period.
+         */
+        SizeBasedBalancer2(long balancePeriod) {
+            this.balancePeriod = balancePeriod;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            long now = U.currentTimeMillis();
+
+            if (lastBalance + balancePeriod < now) {
+                lastBalance = now;
+
+                long maxBytes0 = -1, minBytes0 = -1;
+                int maxBytesIdx = -1, minBytesIdx = -1;
+
+                for (int i = 0; i < clientWorkers.size(); i++) {
+                    GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+                    int sesCnt = worker.workerSessions.size();
+
+                    long bytes0 = worker.bytesRcvd0 + worker.bytesSent0;
+
+                    if ((maxBytes0 == -1 || bytes0 > maxBytes0) && bytes0 > 0 && sesCnt > 1) {
+                        maxBytes0 = bytes0;
+                        maxBytesIdx = i;
+                    }
+
+                    if (minBytes0 == -1 || bytes0 < minBytes0) {
+                        minBytes0 = bytes0;
+                        minBytesIdx = i;
+                    }
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Balancing data [min0=" + minBytes0 + ", minIdx=" + minBytesIdx +
+                        ", max0=" + maxBytes0 + ", maxIdx=" + maxBytesIdx + ']');
+
+                if (maxBytes0 != -1 && minBytes0 != -1) {
+                    GridSelectorNioSessionImpl ses = null;
+
+                    long bytesDiff = maxBytes0 - minBytes0;
+                    long delta = bytesDiff;
+                    double threshold = bytesDiff * 0.9;
+
+                    GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+                        clientWorkers.get(maxBytesIdx).workerSessions;
+
+                    for (GridSelectorNioSessionImpl ses0 : sessions) {
+                        long bytesSent0 = ses0.bytesSent0();
+
+                        if (bytesSent0 < threshold &&
+                            (ses == null || delta > U.safeAbs(bytesSent0 - bytesDiff / 2))) {
+                            ses = ses0;
+                            delta = U.safeAbs(bytesSent0 - bytesDiff / 2);
+                        }
+                    }
+
+                    if (ses != null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Will move session to less loaded worker [ses=" + ses +
+                                ", from=" + maxBytesIdx + ", to=" + minBytesIdx + ']');
+
+                        moveSession(ses, maxBytesIdx, minBytesIdx);
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Unable to find session to move.");
+                    }
+                }
+
+                for (int i = 0; i < clientWorkers.size(); i++) {
+                    GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+                    worker.reset0();
+                }
+            }
+        }
+    }
+
+    /**
      * For tests only.
      */
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/ignite/blob/faac0887/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index e142aef..19482be 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -151,7 +151,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
                             }
                         }
 
-                        return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt;
+                        return srv.readerMoveCount() > readMoveCnt || srv.writerMoveCount() > writeMoveCnt;
                     }
                 }, 30_000);
 
@@ -165,8 +165,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
                     ", rc2=" + readMoveCnt2 +
                     ", wc2=" + writeMoveCnt2 + ']');
 
-                assertTrue(readMoveCnt2 > readMoveCnt1);
-                assertTrue(writeMoveCnt2 > writeMoveCnt1);
+                assertTrue(readMoveCnt2 > readMoveCnt1 || writeMoveCnt2 > writeMoveCnt1);
 
                 readMoveCnt1 = readMoveCnt2;
                 writeMoveCnt1 = writeMoveCnt2;