You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/22 14:42:39 UTC

[01/12] ignite git commit: conn

Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-balance 26eaaba8b -> 6a4d81965


conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/03e54ac2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/03e54ac2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/03e54ac2

Branch: refs/heads/ignite-comm-balance
Commit: 03e54ac24ed666fac9cbfd84dfbf6dc3d89a3a28
Parents: 26eaaba
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 21 10:12:46 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 21 10:12:46 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/util/nio/GridNioServer.java   | 8 +-------
 1 file changed, 1 insertion(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/03e54ac2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index b7b02f5..5da557b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1320,10 +1320,6 @@ public class GridNioServer<T> {
         }
     }
 
-    public interface NioWorker {
-
-    }
-
     /**
      * Thread performing only read operations from the channel.
      */
@@ -1448,7 +1444,7 @@ public class GridNioServer<T> {
          *
          * @param req Change request.
          */
-        @Override  public void offer(GridNioFuture req) {
+        @Override public void offer(GridNioFuture req) {
             changeReqs.offer((NioOperationFuture)req);
 
             selector.wakeup();
@@ -2115,8 +2111,6 @@ public class GridNioServer<T> {
          * @throws IgniteCheckedException If failed.
          */
         private void accept() throws IgniteCheckedException {
-            long lastBalance = U.currentTimeMillis();
-
             try {
                 while (!closed && selector.isOpen() && !Thread.currentThread().isInterrupted()) {
                     // Wake up every 2 seconds to check if closed.


[08/12] ignite git commit: conn

Posted by sb...@apache.org.
conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81832e1d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81832e1d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81832e1d

Branch: refs/heads/ignite-comm-balance
Commit: 81832e1dc9576fee9c8f74c451f4893cd5633d99
Parents: ffd654a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 14:02:39 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 14:23:04 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java | 131 ++++++++++++-------
 .../ignite/internal/util/nio/GridNioWorker.java |  19 ++-
 .../util/nio/GridSelectorNioSessionImpl.java    |  62 ++++++++-
 .../IgniteCommunicationBalanceTest.java         |   2 +-
 4 files changed, 161 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 5da557b..2d5cc64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -435,10 +435,6 @@ public class GridNioServer<T> {
         NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE);
 
         impl.offerStateChange(fut);
-//        int idx = impl.selectorIndex(); // TODO
-//
-//        if (idx != -1)
-//            clientWorkers.get(idx).offer(fut);
 
         return fut;
     }
@@ -498,14 +494,9 @@ public class GridNioServer<T> {
             if (ses.removeFuture(fut))
                 fut.connectionClosed();
         }
-        else if (msgCnt == 1) {
+        else if (msgCnt == 1)
             // Change from 0 to 1 means that worker thread should be waken up.
-//            int idx = ses.selectorIndex();
-//
-//            if (idx != -1) // TODO revisit
-//                clientWorkers.get(idx).offer(fut);
             ses.offerStateChange(fut);
-        }
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -578,7 +569,6 @@ public class GridNioServer<T> {
             ses0.resend(futs);
 
             // Wake up worker.
-            //clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
             ses0.offerStateChange(fut0);
         }
     }
@@ -598,6 +588,10 @@ public class GridNioServer<T> {
     }
 
     public void moveSession(GridNioSession ses, int from, int to) {
+        assert from >= 0 && from < clientWorkers.size() : from;
+        assert to >= 0 && to < clientWorkers.size() : to;
+        assert from != to;
+
         clientWorkers.get(from).offer(new SessionMoveFuture((GridSelectorNioSessionImpl)ses, to));
     }
 
@@ -1336,13 +1330,15 @@ public class GridNioServer<T> {
         /** Worker index. */
         private final int idx;
 
+        /** Sessions assigned to this worker. */
+        private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions =
+            new GridConcurrentHashSet<>();
+
         private volatile long bytesRcvd;
         private volatile long bytesSent;
         private volatile long bytesRcvd0;
         private volatile long bytesSent0;
 
-        private final GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions0 = new GridConcurrentHashSet<>();
-
         /**
          * @param idx Index of this worker in server's array.
          * @param gridName Grid name.
@@ -1360,7 +1356,7 @@ public class GridNioServer<T> {
         }
 
         public Collection<? extends GridNioSession> sessions() {
-            return sessions0;
+            return workerSessions;
         }
 
         /** {@inheritDoc} */
@@ -1409,15 +1405,15 @@ public class GridNioServer<T> {
             try {
                 SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
 
-                Class<?> selectorImplClass =
+                Class<?> selectorImplCls =
                     Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader());
 
                 // Ensure the current selector implementation is what we can instrument.
-                if (!selectorImplClass.isAssignableFrom(selector.getClass()))
+                if (!selectorImplCls.isAssignableFrom(selector.getClass()))
                     return;
 
-                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
-                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
+                Field selectedKeysField = selectorImplCls.getDeclaredField("selectedKeys");
+                Field publicSelectedKeysField = selectorImplCls.getDeclaredField("publicSelectedKeys");
 
                 selectedKeysField.setAccessible(true);
                 publicSelectedKeysField.setAccessible(true);
@@ -1439,17 +1435,41 @@ public class GridNioServer<T> {
             }
         }
 
-        /**
-         * Adds socket channel to the registration queue and wakes up reading thread.
-         *
-         * @param req Change request.
-         */
+        /** {@inheritDoc} */
         @Override public void offer(GridNioFuture req) {
             changeReqs.offer((NioOperationFuture)req);
 
             selector.wakeup();
         }
 
+        /** {@inheritDoc} */
+        @Override public void offer(Collection<GridNioFuture> reqs) {
+            for (GridNioFuture req : reqs)
+                changeReqs.offer((NioOperationFuture)req);
+
+            selector.wakeup();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<GridNioFuture> clearSessionRequests(GridNioSession ses) {
+            List<GridNioFuture> sesReqs = null;
+
+            for (GridNioServer.NioOperationFuture changeReq : changeReqs) {
+                if (changeReq.session() == ses) {
+                    boolean rmv = changeReqs.remove(changeReq);
+
+                    assert rmv : changeReq;
+
+                    if (sesReqs == null)
+                        sesReqs = new ArrayList<>();
+
+                    sesReqs.add(changeReq);
+                }
+            }
+
+            return sesReqs;
+        }
+
         /**
          * Processes read and write events and registration requests.
          *
@@ -1477,25 +1497,29 @@ public class GridNioServer<T> {
                                 GridSelectorNioSessionImpl ses = f.session();
 
                                 if (idx == f.toIdx) {
-                                    ses.worker = this;
+                                    assert f.movedSocketChannel() != null : f;
+
+                                    boolean add = workerSessions.add(ses);
 
-                                    sessions0.add(ses);
+                                    assert add;
 
-                                    SelectionKey key = f.socketChannel().register(selector,
-                                        SelectionKey.OP_READ | SelectionKey.OP_WRITE, ses); // TODO what if reads were paused?
+                                    ses.finishMoveSession(this);
+
+                                    SelectionKey key = f.movedSocketChannel().register(selector,
+                                        SelectionKey.OP_READ | SelectionKey.OP_WRITE,
+                                        ses);
 
                                     ses.key(key);
                                 }
                                 else {
-                                    if (sessions0.remove(ses)) {
-                                        assert ses.worker == this; // TODO replace with IF and ignore?
+                                    assert f.movedSocketChannel() == null : f;
 
-                                        // Cleanup.
-                                        ses.worker = null;
+                                    if (workerSessions.remove(ses)) {
+                                        ses.startMoveSession(this);
 
                                         SelectionKey key = ses.key();
 
-                                        f.socketChannel((SocketChannel)key.channel());
+                                        f.movedSocketChannel((SocketChannel)key.channel());
 
                                         key.cancel();
 
@@ -1879,7 +1903,7 @@ public class GridNioServer<T> {
                     resend(ses);
 
                 sessions.add(ses);
-                sessions0.add(ses);
+                workerSessions.add(ses);
 
                 try {
                     filterChain.onSessionOpened(ses);
@@ -1922,7 +1946,7 @@ public class GridNioServer<T> {
             }
 
             sessions.remove(ses);
-            sessions0.remove(ses);
+            workerSessions.remove(ses);
 
             SelectionKey key = ses.key();
 
@@ -2038,7 +2062,7 @@ public class GridNioServer<T> {
             bytesSent0 = 0;
             bytesRcvd0 = 0;
 
-            for (GridSelectorNioSessionImpl ses : sessions0)
+            for (GridSelectorNioSessionImpl ses : workerSessions)
                 ses.reset0();
         }
     }
@@ -2248,7 +2272,7 @@ public class GridNioServer<T> {
 
         /** Socket channel in register request. */
         @GridToStringExclude
-        protected SocketChannel sockCh; // TODO to be fixed with proper hierarchy
+        private SocketChannel sockCh;
 
         /** Session to perform operation on. */
         @GridToStringExclude
@@ -2450,11 +2474,15 @@ public class GridNioServer<T> {
         /** */
         private final int toIdx;
 
+        /** */
+        @GridToStringExclude
+        private SocketChannel movedSockCh;
+
         /**
-         * @param ses
-         * @param toIdx
+         * @param ses Session.
+         * @param toIdx Target worker index.
          */
-        public SessionMoveFuture(
+        SessionMoveFuture(
             GridSelectorNioSessionImpl ses,
             int toIdx
         ) {
@@ -2463,12 +2491,25 @@ public class GridNioServer<T> {
             this.toIdx = toIdx;
         }
 
+        /**
+         * @return Target worker index.
+         */
         int toIndex() {
             return toIdx;
         }
 
-        void socketChannel(SocketChannel sockCh) {
-            this.sockCh = sockCh;
+        /**
+         * @return Moved session socket channel.
+         */
+        SocketChannel movedSocketChannel() {
+            return movedSockCh;
+        }
+
+        /**
+         * @param movedSockCh Moved session socket channel.
+         */
+        void movedSocketChannel(SocketChannel movedSockCh) {
+            this.movedSockCh = movedSockCh;
         }
 
         /** {@inheritDoc} */
@@ -2938,7 +2979,7 @@ public class GridNioServer<T> {
                         long bytesRcvd0 = worker.bytesRcvd0;
 
                         if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0 &&
-                            worker.sessions0.size() > 1) {
+                            worker.workerSessions.size() > 1) {
                             maxRcvd0 = bytesRcvd0;
                             maxRcvdIdx = i;
 
@@ -2955,7 +2996,7 @@ public class GridNioServer<T> {
                         long bytesSent0 = worker.bytesSent0;
 
                         if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0 > 0 &&
-                            worker.sessions0.size() > 1) {
+                            worker.workerSessions.size() > 1) {
                             maxSent0 = bytesSent0;
                             maxSentIdx = i;
 
@@ -2989,7 +3030,7 @@ public class GridNioServer<T> {
                     double threshold = sentDiff * 0.9;
 
                     GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
-                        clientWorkers.get(maxSentIdx).sessions0;
+                        clientWorkers.get(maxSentIdx).workerSessions;
 
                     for (GridSelectorNioSessionImpl ses0 : sessions) {
                         long bytesSent0 = ses0.bytesSent0();
@@ -3031,7 +3072,7 @@ public class GridNioServer<T> {
                     double threshold = rcvdDiff * 0.9;
 
                     GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
-                        clientWorkers.get(maxRcvdIdx).sessions0;
+                        clientWorkers.get(maxRcvdIdx).workerSessions;
 
                     for (GridSelectorNioSessionImpl ses0 : sessions) {
                         long bytesRcvd0 = ses0.bytesReceived0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
index d088d8c..7f8033a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
@@ -17,12 +17,27 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import java.util.Collection;
+import java.util.List;
+import org.jetbrains.annotations.Nullable;
+
 /**
  *
  */
 public interface GridNioWorker {
     /**
-     * @param fut Future.
+     * @param req Change request.
+     */
+    void offer(GridNioFuture req);
+
+    /**
+     * @param reqs Change requests.
+     */
+    void offer(Collection<GridNioFuture> reqs);
+
+    /**
+     * @param ses Session.
+     * @return Session state change requests.
      */
-    void offer(GridNioFuture fut);
+    @Nullable List<GridNioFuture> clearSessionRequests(GridNioSession ses);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index e515696..0c2033b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.util.nio;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteLogger;
@@ -43,8 +45,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     @GridToStringExclude
     private SelectionKey key;
 
-    /** */
-    public GridNioWorker worker;
+    /** Current worker thread. */
+    private GridNioWorker worker;
 
     /** Size counter. */
     private final AtomicInteger queueSize = new AtomicInteger();
@@ -68,6 +70,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Logger. */
     private final IgniteLogger log;
 
+    /** */
+    private List<GridNioFuture> pendingStateChanges;
+
     /**
      * Creates session instance.
      *
@@ -153,13 +158,60 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         return key;
     }
 
+    /**
+     * @param fut
+     */
     void offerStateChange(GridNioFuture fut) {
-        GridNioWorker worker0 = worker;
+        synchronized (this) {
+            GridNioWorker worker0 = worker;
+
+            if (worker0 == null) {
+                if (pendingStateChanges == null)
+                    pendingStateChanges = new ArrayList<>();
+
+                pendingStateChanges.add(fut);
+            }
+            else
+                worker0.offer(fut);
+        }
+    }
+
+    /**
+     * @param moveFrom
+     */
+    void startMoveSession(GridNioWorker moveFrom) {
+        synchronized (this) {
+            assert this.worker == moveFrom;
 
-        if (worker0 != null)
-            worker0.offer(fut);
+            List<GridNioFuture> sesReqs = moveFrom.clearSessionRequests(this);
+
+            worker = null;
+
+            if (sesReqs != null) {
+                if (pendingStateChanges == null)
+                    pendingStateChanges = new ArrayList<>();
+
+                pendingStateChanges.addAll(sesReqs);
+            }
+        }
     }
 
+    /**
+     * @param moveTo
+     */
+    void finishMoveSession(GridNioWorker moveTo) {
+        synchronized (this) {
+            assert worker == null;
+
+            worker = moveTo;
+
+            if (pendingStateChanges != null) {
+                moveTo.offer(pendingStateChanges);
+
+                pendingStateChanges = null;
+            }
+        }
+    }
 
     /**
      * Adds write future at the front of the queue without acquiring back pressure semaphore.

http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index d523aab..839bd77 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -147,7 +147,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
     /**
      *
      */
-    static class DummyRunnable implements IgniteRunnable {
+    private static class DummyRunnable implements IgniteRunnable {
         /** {@inheritDoc} */
         @Override public void run() {
             // No-op.


[06/12] ignite git commit: conn

Posted by sb...@apache.org.
conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/79ca4aab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/79ca4aab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/79ca4aab

Branch: refs/heads/ignite-comm-balance
Commit: 79ca4aabab4e9d5f69271fbcd6885299c37cb9bf
Parents: 04a4d45
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 11:51:38 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 11:51:38 2016 +0300

----------------------------------------------------------------------
 ...GridTcpCommunicationSpiRecoverySelfTest.java | 21 +++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/79ca4aab/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index fbbc5de..3d33fff 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -327,7 +327,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
                     assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
 
-                    ses1.resumeReads().get();
+                    try {
+                        ses1.resumeReads().get();
+                    }
+                    catch (IgniteCheckedException ignore) {
+                        // Can fail is ses1 was closed.
+                    }
 
                     for (int j = 0; j < 100; j++) {
                         spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
@@ -437,7 +442,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
                     assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
 
-                    ses1.resumeReads().get();
+                    try {
+                        ses1.resumeReads().get();
+                    }
+                    catch (IgniteCheckedException ignore) {
+                        // Can fail is ses1 was closed.
+                    }
 
                     // Wait when session is closed, then try to open new connection from node1.
                     GridTestUtils.waitForCondition(new GridAbsPredicate() {
@@ -554,7 +564,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
                     assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
 
-                    ses1.resumeReads().get();
+                    try {
+                        ses1.resumeReads().get();
+                    }
+                    catch (IgniteCheckedException ignore) {
+                        // Can fail is ses1 was closed.
+                    }
 
                     sndFut.get();
 


[09/12] ignite git commit: conn

Posted by sb...@apache.org.
conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/831aa16d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/831aa16d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/831aa16d

Branch: refs/heads/ignite-comm-balance
Commit: 831aa16dcdad7501ef9c58f2b3e7a4e191e5452b
Parents: 79ca4aab
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 14:35:53 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 14:35:53 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/GridTcpSpiForwardingSelfTest.java       | 13 +++++++------
 ...mmunicationSpiRecoveryFailureDetectionSelfTest.java |  1 +
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/831aa16d/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index deda313..5ca8f26 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -69,7 +69,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
     private static final int commExtPort2 = 20100;
 
     /** */
-    private AddressResolver resolver;
+    private AddressResolver rslvr;
 
     /** */
     private boolean ipFinderUseLocPorts;
@@ -127,12 +127,13 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         commSpi.setLocalPort(commLocPort);
         commSpi.setLocalPortRange(1);
         commSpi.setSharedMemoryPort(-1);
+        commSpi.setConnectionsPerNode(1);
 
         cfg.setCommunicationSpi(commSpi);
 
-        assert resolver != null;
+        assert rslvr != null;
 
-        cfg.setAddressResolver(resolver);
+        cfg.setAddressResolver(rslvr);
 
         return cfg;
     }
@@ -148,7 +149,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         map.put(new InetSocketAddress("127.0.0.1", locPort2), F.asList(new InetSocketAddress("127.0.0.1", extPort2)));
         map.put(new InetSocketAddress("127.0.0.1", commLocPort2), F.asList(new InetSocketAddress("127.0.0.1", commExtPort2)));
 
-        resolver = new AddressResolver() {
+        rslvr = new AddressResolver() {
             @Override public Collection<InetSocketAddress> getExternalAddresses(InetSocketAddress addr) {
                 return map.get(addr);
             }
@@ -168,7 +169,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         map.put("127.0.0.1:" + locPort2, "127.0.0.1:" + extPort2);
         map.put("127.0.0.1:" + commLocPort2, "127.0.0.1:" + commExtPort2);
 
-        resolver = new BasicAddressResolver(map);
+        rslvr = new BasicAddressResolver(map);
 
         doTestForward();
     }
@@ -181,7 +182,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
 
         map.put("127.0.0.1", "127.0.0.1");
 
-        resolver = new BasicAddressResolver(map);
+        rslvr = new BasicAddressResolver(map);
 
         ipFinderUseLocPorts = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/831aa16d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
index 95c9e40..b1aa119 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
@@ -33,6 +33,7 @@ public class GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest extends Gri
         spi.setAckSendThreshold(5);
         spi.setSocketSendBuffer(512);
         spi.setSocketReceiveBuffer(512);
+        spi.setConnectionsPerNode(1);
 
         return spi;
     }


[04/12] ignite git commit: conn

Posted by sb...@apache.org.
conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b0ffee0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b0ffee0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b0ffee0

Branch: refs/heads/ignite-comm-balance
Commit: 3b0ffee055ed843616282f013daa9d0b982e13bf
Parents: c604e8c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 21 12:54:53 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 21 12:54:53 2016 +0300

----------------------------------------------------------------------
 .../util/nio/GridSelectorNioSessionImpl.java    |  2 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 47 +++++++++++++++-----
 .../IgniteCacheMessageWriteTimeoutTest.java     | 13 ++++--
 3 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index a680a33..88721ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -303,7 +303,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
             outRecovery = (GridNioRecoveryDescriptor)val;
 
-            outRecovery.connected();
+            outRecovery.onConnected();
 
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c9d9bf7..c131cf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -424,13 +424,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 if (msg instanceof NodeIdMessage) {
                     sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
-                    connKey = new ConnectionKey(sndId, 0);
+                    connKey = new ConnectionKey(sndId, 0, -1);
                 }
                 else {
                     assert msg instanceof HandshakeMessage : msg;
 
+                    HandshakeMessage msg0 = (HandshakeMessage)msg;
+
                     sndId = ((HandshakeMessage)msg).nodeId();
-                    connKey = new ConnectionKey(sndId, ((HandshakeMessage)msg).connectionIndex());
+                    connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
                 }
 
                 if (log.isDebugEnabled())
@@ -470,8 +472,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (reserve)
                         connectedNew(recoveryDesc, ses, true);
                     else {
-                        if (c.failed)
-                            ses.close();
+                        if (c.failed) {
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+
+                            for (GridNioSession ses0 : nioSrvr.sessions()) {
+                                ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+                                if (ses0.accepted() && key0 != null &&
+                                    key0.nodeId().equals(connKey.nodeId()) &&
+                                    key0.connectionIndex() == connKey.connectionIndex() &&
+                                    key0.connectCount() < connKey.connectCount())
+                                    ses0.close();
+                            }
+                        }
                     }
                 }
                 else {
@@ -2369,7 +2382,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 // Do not allow concurrent connects.
                 GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
 
-                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx);
+                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
 
                 GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
 
@@ -2705,7 +2718,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "(node left topology): " + node);
                     }
 
-                    ConnectionKey connKey = new ConnectionKey(node.id(), connIdx);
+                    ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
 
                     GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
 
@@ -3097,8 +3110,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             rcvCnt = buf.getLong(1);
                         }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+                       // if (log.isDebugEnabled())
+                            log.info("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
 
                         if (rcvCnt == -1) {
                             if (log.isDebugEnabled())
@@ -3487,7 +3500,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     GridNioRecoveryDescriptor recovery = null;
 
                     if (!useMultipleConnections(node) && client instanceof GridTcpNioCommunicationClient) {
-                        recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex()));
+                        recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
 
                         if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
                             RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
@@ -3508,7 +3521,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     if (idleTime >= idleConnTimeout) {
                         if (recovery == null && useMultipleConnections(node))
-                            recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex()));
+                            recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
 
                         if (recovery != null &&
                             recovery.nodeAlive(getSpiContext().node(nodeId)) &&
@@ -4273,13 +4286,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         /** */
         private final int idx;
 
+        /** */
+        private final long connCnt;
+
         /**
          * @param nodeId Node ID.
          * @param idx Connection index.
+         * @param connCnt Connection counter (set only for incoming connections).
          */
-        ConnectionKey(UUID nodeId, int idx) {
+        ConnectionKey(UUID nodeId, int idx, long connCnt) {
             this.nodeId = nodeId;
             this.idx = idx;
+            this.connCnt = connCnt;
+        }
+
+        /**
+         * @return Connection counter.
+         */
+        long connectCount() {
+            return connCnt;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index 5b51af8..0dd4079 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -66,15 +66,20 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
         super.afterTest();
     }
 
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60_000;
+    }
+
     /**
      * @throws Exception If failed.
      */
     public void testMessageQueueLimit() throws Exception {
-        startGridsMultiThreaded(3);
-
-        for (int i = 0; i < 15; i++) {
+        for (int i = 0; i < 3; i++) {
             log.info("Iteration: " + i);
 
+            startGridsMultiThreaded(3);
+
             IgniteInternalFuture<?> fut1 = startJobThreads(50);
 
             U.sleep(100);
@@ -83,6 +88,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
 
             fut1.get();
             fut2.get();
+
+            stopAllGrids();
         }
     }
 


[03/12] ignite git commit: conn

Posted by sb...@apache.org.
conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c604e8cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c604e8cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c604e8cb

Branch: refs/heads/ignite-comm-balance
Commit: c604e8cb291bae294bf97d8eb13fc16b8cf8a12e
Parents: bb465cc
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 21 11:04:40 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 21 11:04:40 2016 +0300

----------------------------------------------------------------------
 .../GridTcpCommunicationSpiAbstractTest.java    | 28 ++++++++++++--
 ...mmunicationSpiConcurrentConnectSelfTest.java | 40 ++++++++++++++++++--
 2 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c604e8cb/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 076724d..3c4fea0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -90,16 +90,36 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         super.afterTest();
 
         for (CommunicationSpi spi : spis.values()) {
-            ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
+            ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients");
+
+            for (int i = 0; i < 20; i++) {
+                GridCommunicationClient client0 = null;
+
+                for (GridCommunicationClient[] clients0 : clients.values()) {
+                    for (GridCommunicationClient client : clients0) {
+                        if (client != null) {
+                            client0 = client;
+
+                            break;
+                        }
+                    }
+
+                    if (client0 != null)
+                        break;
+                }
+
+                if (client0 == null)
+                    return;
 
-            for (int i = 0; i < 20 && !clients.isEmpty(); i++) {
                 info("Check failed for SPI [grid=" +
-                    GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + ", spi=" + spi + ']');
+                    GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") +
+                    ", client=" + client0 +
+                    ", spi=" + spi + ']');
 
                 U.sleep(1000);
             }
 
-            assert clients.isEmpty() : "Clients: " + clients;
+            fail("Failed to wait when clients are closed.");
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c604e8cb/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index bd66319..ed047fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -253,7 +254,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
                 final AtomicInteger idx = new AtomicInteger();
 
                 try {
-                    GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                    final Callable<Void> c = new Callable<Void>() {
                         @Override public Void call() throws Exception {
                             int idx0 = idx.getAndIncrement();
 
@@ -279,7 +280,40 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
                             return null;
                         }
-                    }, threads, "test");
+                    };
+
+                    List<Thread> threadsList = new ArrayList<>();
+
+                    final AtomicBoolean fail = new AtomicBoolean();
+
+                    final AtomicLong tId = new AtomicLong();
+
+                    for (int t = 0; t < threads; t++) {
+                        Thread t0 = new Thread(new Runnable() {
+                            @Override public void run() {
+                                try {
+                                    c.call();
+                                }
+                                catch (Throwable e) {
+                                    log.error("Unexpected error: " + e, e);
+
+                                    fail.set(true);
+                                }
+                            }
+                        }) {
+                            @Override public long getId() {
+                                // Override getId to use all connections.
+                                return tId.getAndIncrement();
+                            }
+                        };
+
+                        threadsList.add(t0);
+
+                        t0.start();
+                    }
+
+                    for (Thread t0 : threadsList)
+                        t0.join();
 
                     assertTrue(latch.await(10, TimeUnit.SECONDS));
 
@@ -294,7 +328,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
                             @Override public boolean apply() {
                                 Collection sessions = U.field(srv, "sessions");
 
-                                return sessions.size() == 2;
+                                return sessions.size() == 2 * connectionsPerNode;
                             }
                         }, 5000);
 


[10/12] ignite git commit: conn

Posted by sb...@apache.org.
conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8d217413
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8d217413
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8d217413

Branch: refs/heads/ignite-comm-balance
Commit: 8d2174135a5fb5fffe6d7e695228862241b4fcef
Parents: 831aa16
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 14:41:01 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 15:28:23 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 ++
 .../ignite/internal/IgniteSlowClientDetectionSelfTest.java       | 1 +
 .../communication/IgniteVariousConnectionNumberTest.java         | 4 ++++
 3 files changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8d217413/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 639e23d..784b081 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3947,6 +3947,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      *
      */
+    @SuppressWarnings("PublicInnerClass")
     public static class HandshakeMessage2 extends HandshakeMessage {
         /** */
         private static final long serialVersionUID = 0L;
@@ -3969,6 +3970,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
          */
         HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
             super(nodeId, connectCnt, rcvCnt);
+
             this.connIdx = connIdx;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d217413/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 760313b..5721887 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -75,6 +75,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
         commSpi.setSlowClientQueueLimit(50);
         commSpi.setSharedMemoryPort(-1);
         commSpi.setIdleConnectionTimeout(300_000);
+        commSpi.setConnectionsPerNode(1);
 
         cfg.setCommunicationSpi(commSpi);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d217413/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
index 360eb8d..00a25d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
@@ -115,6 +115,10 @@ public class IgniteVariousConnectionNumberTest extends GridCommonAbstractTest {
 
             int idx = ThreadLocalRandom.current().nextInt(NODES);
 
+            Ignite node = ignite(idx);
+
+            client = node.configuration().isClientMode();
+
             stopGrid(idx);
 
             startGrid(idx);


[11/12] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-comm-opts2' into ignite-comm-balance

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-comm-opts2' into ignite-comm-balance


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9c87e2c7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9c87e2c7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9c87e2c7

Branch: refs/heads/ignite-comm-balance
Commit: 9c87e2c7b4baf81625e89d58d38e90de67598993
Parents: 81832e1 8d21741
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 16:14:49 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 16:14:49 2016 +0300

----------------------------------------------------------------------
 .../spi/communication/tcp/TcpCommunicationSpi.java     |  2 ++
 .../internal/IgniteSlowClientDetectionSelfTest.java    |  1 +
 .../IgniteVariousConnectionNumberTest.java             |  4 ++++
 .../ignite/spi/GridTcpSpiForwardingSelfTest.java       | 13 +++++++------
 ...mmunicationSpiRecoveryFailureDetectionSelfTest.java |  1 +
 5 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------



[02/12] ignite git commit: conn

Posted by sb...@apache.org.
conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb465cc9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb465cc9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb465cc9

Branch: refs/heads/ignite-comm-balance
Commit: bb465cc9c4a8960fa218c6a1ce2b806ea24a11b3
Parents: c1d2436
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 21 10:18:45 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 21 10:18:45 2016 +0300

----------------------------------------------------------------------
 .../internal/managers/communication/GridIoMessageFactory.java      | 2 +-
 .../apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bb465cc9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 908543c..711c03f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -755,7 +755,7 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..119] [124] - this
+            // [-3..119] [124-125] - this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb465cc9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index ee30420..c9d9bf7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -334,7 +334,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final byte HANDSHAKE_MSG_TYPE = -3;
 
     /** */
-    public static final byte HANDSHAKE_MSG_TYPE2 = -4;
+    public static final byte HANDSHAKE_MSG_TYPE2 = 125;
 
     /** */
     private ConnectGateway connectGate;


[12/12] ignite git commit: conn

Posted by sb...@apache.org.
conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a4d8196
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a4d8196
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a4d8196

Branch: refs/heads/ignite-comm-balance
Commit: 6a4d81965a78dd3f47bea3b33f823c62e994dd9a
Parents: 9c87e2c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 17:37:05 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 17:42:27 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java |  34 ++-
 .../communication/tcp/TcpCommunicationSpi.java  |   4 +-
 .../IgniteCommunicationBalanceTest.java         | 215 +++++++++++++++----
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 4 files changed, 206 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 2d5cc64..7352b5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -47,6 +47,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -92,6 +93,9 @@ public class GridNioServer<T> {
     /** */
     public static final String IGNITE_NIO_SES_BALANCER_CLASS_NAME = "IGNITE_NIO_SES_BALANCER_CLASS_NAME";
 
+    /** */
+    public static final String IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD = "IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD";
+
     /** Default session write timeout. */
     public static final int DFLT_SES_WRITE_TIMEOUT = 5000;
 
@@ -215,10 +219,10 @@ public class GridNioServer<T> {
     private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
 
     /** */
-    private volatile long writerMoveCnt;
+    private final AtomicLong readerMoveCnt = new AtomicLong();
 
     /** */
-    private volatile long readerMoveCnt;
+    private final AtomicLong writerMoveCnt = new AtomicLong();
 
     /** */
     private final Balancer balancer;
@@ -361,6 +365,14 @@ public class GridNioServer<T> {
         this.balancer = balancer0;
     }
 
+    public long readerMoveCount() {
+        return readerMoveCnt.get();
+    }
+
+    public long writerMoveCount() {
+        return writerMoveCnt.get();
+    }
+
     /**
      * @return Configured port.
      */
@@ -1505,6 +1517,11 @@ public class GridNioServer<T> {
 
                                     ses.finishMoveSession(this);
 
+                                    if (idx % 2 == 0)
+                                        readerMoveCnt.incrementAndGet();
+                                    else
+                                        writerMoveCnt.incrementAndGet();
+
                                     SelectionKey key = f.movedSocketChannel().register(selector,
                                         SelectionKey.OP_READ | SelectionKey.OP_WRITE,
                                         ses);
@@ -2948,10 +2965,13 @@ public class GridNioServer<T> {
         /** */
         private long lastBalance;
 
+        /** */
+        private final long balancePeriod = IgniteSystemProperties.getLong(IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, 5000);
+
         /**
          * @param srv Server.
          */
-        public SizeBasedBalancer(GridNioServer<?> srv) {
+        SizeBasedBalancer(GridNioServer<?> srv) {
             this.srv = srv;
 
             log = srv.log;
@@ -2961,13 +2981,13 @@ public class GridNioServer<T> {
         @Override public void balance() {
             long now = U.currentTimeMillis();
 
-            if (lastBalance + 5000 < now) {
+            if (lastBalance + balancePeriod < now) {
                 lastBalance = now;
 
                 long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1;
                 int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1;
 
-                boolean print = Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4");
+                boolean print = false;//Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4");
 
                 List<GridNioServer.AbstractNioClientWorker> clientWorkers = (List)srv.clientWorkers;
 
@@ -3051,7 +3071,7 @@ public class GridNioServer<T> {
                             log.info("Will move session to less loaded writer [diff=" + sentDiff + ", ses=" + ses +
                                 ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
 
-                        srv.writerMoveCnt++;
+                        srv.writerMoveCnt.incrementAndGet();
 
                         clientWorkers.get(maxSentIdx).offer(new SessionMoveFuture(ses, minSentIdx));
                     }
@@ -3093,7 +3113,7 @@ public class GridNioServer<T> {
                             log.info("Will move session to less loaded reader [diff=" + rcvdDiff + ", ses=" + ses +
                                 ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
 
-                        srv.readerMoveCnt++;
+                        srv.readerMoveCnt.incrementAndGet();
 
                         clientWorkers.get(maxRcvdIdx).offer(new SessionMoveFuture(ses, minRcvdIdx));
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 784b081..fd9985e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3110,8 +3110,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             rcvCnt = buf.getLong(1);
                         }
 
-                       // if (log.isDebugEnabled())
-                            log.info("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+                       if (log.isDebugEnabled())
+                            log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
 
                         if (rcvCnt == -1) {
                             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index 839bd77..86d43e7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -22,12 +22,19 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -58,6 +65,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
         TcpCommunicationSpi commSpi = ((TcpCommunicationSpi)cfg.getCommunicationSpi());
 
         commSpi.setSharedMemoryPort(-1);
+        commSpi.setConnectionsPerNode(1);
 
         if (selectors > 0)
             commSpi.setSelectorsCount(selectors);
@@ -79,45 +87,157 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testBalance() throws Exception {
-        selectors = 4;
+    public void testBalance1() throws Exception {
+        System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "500");
 
-        startGrid(0);
+        try {
+            selectors = 4;
 
-        client = true;
+            startGridsMultiThreaded(4);
 
-        Ignite client = startGrid(4);
+            client = true;
 
-        startGridsMultiThreaded(1, 3);
+            Ignite client = startGrid(4);
 
-        for (int i = 0; i < 4; i++) {
-            ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
+            for (int i = 0; i < 4; i++) {
+                ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
 
-            client.compute(client.cluster().forNode(node)).run(new DummyRunnable());
+                client.compute(client.cluster().forNode(node)).run(new DummyRunnable(null));
+            }
+
+            waitNioBalanceStop(client, 30_000);
+
+            final GridNioServer srv = GridTestUtils.getFieldValue(client.configuration().getCommunicationSpi(), "nioSrvr");
+
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            long readMoveCnt1 = srv.readerMoveCount();
+            long writeMoveCnt1 = srv.writerMoveCount();
+
+            for (int iter = 0; iter < 10; iter++) {
+                log.info("Iteration: " + iter);
+
+                int nodeIdx = rnd.nextInt(4);
+
+                ClusterNode node = client.cluster().node(ignite(nodeIdx).cluster().localNode().id());
+
+                IgniteCompute compute = client.compute(client.cluster().forNode(node));
+
+                for (int i = 0; i < 10_000; i++)
+                    compute.run(new DummyRunnable(null));
+
+                final long readMoveCnt = readMoveCnt1;
+                final long writeMoveCnt = writeMoveCnt1;
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt;
+                    }
+                }, 10_000);
+
+                waitNioBalanceStop(client, 30_000);
+
+                long readMoveCnt2 = srv.readerMoveCount();
+                long writeMoveCnt2 = srv.writerMoveCount();
+
+                assertTrue(readMoveCnt2 > readMoveCnt1);
+                assertTrue(writeMoveCnt2 > writeMoveCnt1);
+
+                readMoveCnt1 = readMoveCnt2;
+                writeMoveCnt1 = writeMoveCnt2;
+            }
+
+            for (Ignite node : G.allGrids())
+                waitNioBalanceStop(node, 10_000);
+        }
+        finally {
+            System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "");
         }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBalance2() throws Exception {
+        System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "500");
+
+        try {
+            startGridsMultiThreaded(5);
+
+            client = true;
+
+            startGridsMultiThreaded(5, 5);
+
+            for (int i = 0; i < 20; i++) {
+                log.info("Iteration: " + i);
+
+                final AtomicInteger idx = new AtomicInteger();
+
+                GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        Ignite node = ignite(idx.incrementAndGet() % 10);
+
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-//        ThreadLocalRandom rnd = ThreadLocalRandom.current();
-//
-//        for (int iter = 0; iter < 10; iter++) {
-//            log.info("Iteration: " + iter);
-//
-//            int nodeIdx = rnd.nextInt(4);
-//
-//            ClusterNode node = client.cluster().node(ignite(nodeIdx).cluster().localNode().id());
-//
-//            for (int i = 0; i < 10_000; i++)
-//                client.compute(client.cluster().forNode(node)).run(new DummyRunnable());
-//
-//            U.sleep(5000);
-//        }
-
-        while (true) {
-            ((IgniteKernal) client).dumpDebugInfo();
-
-            Thread.sleep(5000);
+                        int msgs = rnd.nextInt(1000);
+
+                        for (int i = 0; i < msgs; i++) {
+                            int sndTo = rnd.nextInt(10);
+
+                            ClusterNode sntToNode = node.cluster().node(ignite(sndTo).cluster().localNode().id());
+
+                            IgniteCompute compute = node.compute(node.cluster().forNode(sntToNode));
+
+                            compute.run(new DummyRunnable(new byte[rnd.nextInt(1024)]));
+                        }
+
+                        return null;
+                    }
+                }, 30, "test-thread");
+
+                for (Ignite node : G.allGrids())
+                    waitNioBalanceStop(node, 10_000);
+            }
+        }
+        finally {
+            System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, "");
         }
+    }
+
+    /**
+     * @param node Node.
+     * @param timeout Timeout.
+     * @throws Exception If failed.
+     */
+    private void waitNioBalanceStop(Ignite node, long timeout) throws Exception {
+        TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi();
+
+        final GridNioServer srv = GridTestUtils.getFieldValue(spi, "nioSrvr");
+
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+            @Override public boolean applyx() throws IgniteCheckedException {
+                long readerMovCnt1 = srv.readerMoveCount();
+                long writerMovCnt1 = srv.writerMoveCount();
+
+                U.sleep(2000);
 
-        //Thread.sleep(Long.MAX_VALUE);
+                long readerMovCnt2 = srv.readerMoveCount();
+                long writerMovCnt2 = srv.writerMoveCount();
+
+                if (readerMovCnt1 != readerMovCnt2) {
+                    log.info("Readers balance is in progress [cnt1=" + readerMovCnt1 + ", cnt2=" + readerMovCnt2 + ']');
+
+                    return false;
+                }
+                if (writerMovCnt1 != writerMovCnt2) {
+                    log.info("Writers balance is in progress [cnt1=" + writerMovCnt1 + ", cnt2=" + writerMovCnt2 + ']');
+
+                    return false;
+                }
+
+                return true;
+            }
+        }, timeout));
     }
 
     /**
@@ -126,28 +246,43 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
     public void testRandomBalance() throws Exception {
         System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_CLASS_NAME, TestBalancer.class.getName());
 
-        final int NODES = 10;
+        try {
+            final int NODES = 10;
 
-        startGridsMultiThreaded(NODES);
+            startGridsMultiThreaded(NODES);
 
-        final long stopTime = System.currentTimeMillis() + 60_000;
+            final long stopTime = System.currentTimeMillis() + 60_000;
 
-        GridTestUtils.runMultiThreaded(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                while (System.currentTimeMillis() < stopTime)
-                    ignite(rnd.nextInt(NODES)).compute().broadcast(new DummyRunnable());
+                    while (System.currentTimeMillis() < stopTime)
+                        ignite(rnd.nextInt(NODES)).compute().broadcast(new DummyRunnable(null));
 
-                return null;
-            }
-        }, 20, "test-thread");
+                    return null;
+                }
+            }, 20, "test-thread");
+        }
+        finally {
+            System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_CLASS_NAME, null);
+        }
     }
 
     /**
      *
      */
     private static class DummyRunnable implements IgniteRunnable {
+        /** */
+        private byte[] data;
+
+        /**
+         * @param data Data.
+         */
+        public DummyRunnable(byte[] data) {
+            this.data = data;
+        }
+
         /** {@inheritDoc} */
         @Override public void run() {
             // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 3a0d1ee..5b24a13 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -37,6 +37,7 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreOptimizedMarshallerS
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
+import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
 import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
 import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest;
@@ -321,6 +322,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(CacheTxFastFinishTest.class);
 
         suite.addTestSuite(IgniteVariousConnectionNumberTest.class);
+        suite.addTestSuite(IgniteCommunicationBalanceTest.class);
 
         return suite;
     }


[05/12] ignite git commit: conn

Posted by sb...@apache.org.
conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04a4d458
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04a4d458
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04a4d458

Branch: refs/heads/ignite-comm-balance
Commit: 04a4d4584ecdb9e44a1107921b640aa57bcc6cbb
Parents: 3b0ffee
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 10:21:29 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 10:21:29 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/communication/tcp/TcpCommunicationSpi.java      | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/04a4d458/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c131cf2..639e23d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -315,7 +315,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
 
     /** Default connections per node. */
-    public static final int DFLT_CONN_PER_NODE = 1;
+    public static final int DFLT_CONN_PER_NODE = 2;
 
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {
@@ -1132,7 +1132,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      * TODO
      *
-     * @param maxConnectionsPerNode
+     * @param maxConnectionsPerNode Number of connections per node.
      */
     public void setConnectionsPerNode(int maxConnectionsPerNode) {
         this.connectionsPerNode = maxConnectionsPerNode;
@@ -1141,7 +1141,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      * TODO
      *
-     * @return
+     * @return Number of connections per node.
      */
     public int getConnectionsPerNode() {
         return connectionsPerNode;


[07/12] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-comm-opts2' into ignite-comm-balance

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-comm-opts2' into ignite-comm-balance


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ffd654a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ffd654a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ffd654a7

Branch: refs/heads/ignite-comm-balance
Commit: ffd654a777780e60f27fc3a2ac5d590a45123c87
Parents: 03e54ac 79ca4aab
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 22 12:02:47 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 22 12:02:47 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  2 +-
 .../util/nio/GridSelectorNioSessionImpl.java    |  2 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 55 ++++++++++++++------
 .../IgniteCacheMessageWriteTimeoutTest.java     | 13 +++--
 .../GridTcpCommunicationSpiAbstractTest.java    | 28 ++++++++--
 ...mmunicationSpiConcurrentConnectSelfTest.java | 40 ++++++++++++--
 ...GridTcpCommunicationSpiRecoverySelfTest.java | 21 ++++++--
 7 files changed, 131 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ffd654a7/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------