You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/16 11:41:48 UTC

[19/49] ignite git commit: IGNITE-3220 I/O bottleneck on server/client cluster configuration Communications optimizations: - possibility to open separate in/out connections - possibility to have multiple connections between nodes - implemented NIO sessio

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index c8e2e0b..bc1f173 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -37,6 +37,7 @@ import java.nio.channels.WritableByteChannel;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
@@ -44,6 +45,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -63,6 +66,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -86,11 +90,14 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPER
  *
  */
 public class GridNioServer<T> {
+    /** */
+    public static final String IGNITE_IO_BALANCE_RANDOM_BALANCE = "IGNITE_IO_BALANCE_RANDOM_BALANCER";
+
     /** Default session write timeout. */
     public static final int DFLT_SES_WRITE_TIMEOUT = 5000;
 
     /** Default send queue limit. */
-    public static final int DFLT_SEND_QUEUE_LIMIT = 1024;
+    public static final int DFLT_SEND_QUEUE_LIMIT = 0;
 
     /** Time, which server will wait before retry operation. */
     private static final long ERR_WAIT_TIME = 2000;
@@ -122,6 +129,9 @@ public class GridNioServer<T> {
         }
     }
 
+    /** Defines how many times selector should do {@code selectNow()} before doing {@code select(long)}. */
+    private long selectorSpins;
+
     /** Accept worker thread. */
     @GridToStringExclude
     private final IgniteThread acceptThread;
@@ -145,9 +155,13 @@ public class GridNioServer<T> {
     /** Flag indicating if this server should use direct buffers. */
     private final boolean directBuf;
 
-    /** Index to select which thread will serve next socket channel. Using round-robin balancing. */
+    /** Index to select which thread will serve next incoming socket channel. Using round-robin balancing. */
+    @GridToStringExclude
+    private int readBalanceIdx;
+
+    /** Index to select which thread will serve next out socket channel. Using round-robin balancing. */
     @GridToStringExclude
-    private int balanceIdx;
+    private int writeBalanceIdx = 1;
 
     /** Tcp no delay flag. */
     private final boolean tcpNoDelay;
@@ -204,12 +218,25 @@ public class GridNioServer<T> {
     /** Optional listener to monitor outbound message queue size. */
     private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
 
+    /** */
+    private final AtomicLong readerMoveCnt = new AtomicLong();
+
+    /** */
+    private final AtomicLong writerMoveCnt = new AtomicLong();
+
+    /** */
+    private final IgniteRunnable balancer;
+
     /**
      * @param addr Address.
      * @param port Port.
      * @param log Log.
      * @param selectorCnt Count of selectors and selecting threads.
      * @param gridName Grid name.
+     * @param srvName Logical server name for threads identification.
+     * @param selectorSpins Defines how many non-blocking {@code selector.selectNow()} should be made before
+     *      falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+     *      Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
      * @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets.
      * @param directBuf Direct buffer flag.
      * @param order Byte order.
@@ -223,6 +250,7 @@ public class GridNioServer<T> {
      * @param writerFactory Writer factory.
      * @param skipRecoveryPred Skip recovery predicate.
      * @param msgQueueLsnr Message queue size listener.
+     * @param balancing NIO sessions balancing flag.
      * @param filters Filters for this server.
      * @throws IgniteCheckedException If failed.
      */
@@ -232,6 +260,8 @@ public class GridNioServer<T> {
         IgniteLogger log,
         int selectorCnt,
         @Nullable String gridName,
+        @Nullable String srvName,
+        long selectorSpins,
         boolean tcpNoDelay,
         boolean directBuf,
         ByteOrder order,
@@ -245,6 +275,7 @@ public class GridNioServer<T> {
         GridNioMessageWriterFactory writerFactory,
         IgnitePredicate<Message> skipRecoveryPred,
         IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
+        boolean balancing,
         GridNioFilter... filters
     ) throws IgniteCheckedException {
         if (port != -1)
@@ -268,6 +299,7 @@ public class GridNioServer<T> {
         this.sockSndBuf = sockSndBuf;
         this.sndQueueLimit = sndQueueLimit;
         this.msgQueueLsnr = msgQueueLsnr;
+        this.selectorSpins = selectorSpins;
 
         filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
 
@@ -299,9 +331,16 @@ public class GridNioServer<T> {
         clientThreads = new IgniteThread[selectorCnt];
 
         for (int i = 0; i < selectorCnt; i++) {
+            String threadName;
+
+            if (srvName == null)
+                threadName = "grid-nio-worker-" + i;
+            else
+                threadName = "grid-nio-worker-" + srvName + "-" + i;
+
             AbstractNioClientWorker worker = directMode ?
-                new DirectNioClientWorker(i, gridName, "grid-nio-worker-" + i, log) :
-                new ByteBufferNioClientWorker(i, gridName, "grid-nio-worker-" + i, log);
+                new DirectNioClientWorker(i, gridName, threadName, log) :
+                new ByteBufferNioClientWorker(i, gridName, threadName, log);
 
             clientWorkers.add(worker);
 
@@ -315,6 +354,32 @@ public class GridNioServer<T> {
         this.writerFactory = writerFactory;
 
         this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<Message>alwaysFalse();
+
+        long balancePeriod = IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, 5000);
+
+        IgniteRunnable balancer0 = null;
+
+        if (balancing && balancePeriod > 0) {
+            boolean rndBalance = IgniteSystemProperties.getBoolean(IGNITE_IO_BALANCE_RANDOM_BALANCE, false);
+
+            balancer0 = rndBalance ? new RandomBalancer() : new SizeBasedBalancer(balancePeriod);
+        }
+
+        this.balancer = balancer0;
+    }
+
+    /**
+     * @return Number of reader sessions move.
+     */
+    public long readerMoveCount() {
+        return readerMoveCnt.get();
+    }
+
+    /**
+     * @return Number of reader writer move.
+     */
+    public long writerMoveCount() {
+        return writerMoveCnt.get();
     }
 
     /**
@@ -377,6 +442,13 @@ public class GridNioServer<T> {
     }
 
     /**
+     * @return Selector spins.
+     */
+    public long selectorSpins() {
+        return selectorSpins;
+    }
+
+    /**
      * @param ses Session to close.
      * @return Future for operation.
      */
@@ -390,7 +462,7 @@ public class GridNioServer<T> {
 
         NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE);
 
-        clientWorkers.get(impl.selectorIndex()).offer(fut);
+        impl.offerStateChange(fut);
 
         return fut;
     }
@@ -398,61 +470,91 @@ public class GridNioServer<T> {
     /**
      * @param ses Session.
      * @param msg Message.
+     * @param createFut {@code True} if future should be created.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg) {
-        assert ses instanceof GridSelectorNioSessionImpl;
+    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) throws IgniteCheckedException {
+        assert ses instanceof GridSelectorNioSessionImpl : ses;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+        if (createFut) {
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
 
-        send0(impl, fut, false);
+            send0(impl, fut, false);
 
-        return fut;
+            return fut;
+        }
+        else {
+            SessionWriteRequest req = new WriteRequestImpl(ses, msg, true);
+
+            send0(impl, req, false);
+
+            return null;
+        }
     }
 
     /**
      * @param ses Session.
      * @param msg Message.
+     * @param createFut {@code True} if future should be created.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, Message msg) {
+    GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) throws IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
-            skipRecoveryPred.apply(msg));
+        if (createFut) {
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
+                skipRecoveryPred.apply(msg));
 
-        send0(impl, fut, false);
+            send0(impl, fut, false);
 
-        return fut;
+            return fut;
+        }
+        else {
+            SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg));
+
+            send0(impl, req, false);
+
+            return null;
+        }
     }
 
     /**
      * @param ses Session.
-     * @param fut Future.
+     * @param req Request.
      * @param sys System message flag.
+     * @throws IgniteCheckedException If session was closed.
      */
-    private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut, boolean sys) {
+    private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest req, boolean sys) throws IgniteCheckedException {
         assert ses != null;
-        assert fut != null;
+        assert req != null;
 
-        int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
+        int msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req);
 
         IgniteInClosure<IgniteException> ackC;
 
         if (!sys && (ackC = ses.removeMeta(ACK_CLOSURE.ordinal())) != null)
-            fut.ackClosure(ackC);
+            req.ackClosure(ackC);
 
         if (ses.closed()) {
-            if (ses.removeFuture(fut))
-                fut.connectionClosed();
+            if (ses.removeFuture(req)) {
+                IOException err = new IOException("Failed to send message (connection was closed): " + ses);
+
+                req.onError(err);
+
+                if (!(req instanceof GridNioFuture))
+                    throw new IgniteCheckedException(err);
+            }
+        }
+        else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true)) {
+            AbstractNioClientWorker worker = (AbstractNioClientWorker)ses.worker();
+
+            if (worker != null)
+                worker.offer((SessionChangeRequest)req);
         }
-        else if (msgCnt == 1)
-            // Change from 0 to 1 means that worker thread should be waken up.
-            clientWorkers.get(ses.selectorIndex()).offer(fut);
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -463,10 +565,10 @@ public class GridNioServer<T> {
      *
      * @param ses Session.
      * @param msg Message.
-     * @return Future.
+     * @throws IgniteCheckedException If session was closed.
      */
-    public GridNioFuture<?> sendSystem(GridNioSession ses, Message msg) {
-        return sendSystem(ses, msg, null);
+    public void sendSystem(GridNioSession ses, Message msg) throws IgniteCheckedException {
+        sendSystem(ses, msg, null);
     }
 
     /**
@@ -475,27 +577,30 @@ public class GridNioServer<T> {
      * @param ses Session.
      * @param msg Message.
      * @param lsnr Future listener notified from the session thread.
-     * @return Future.
+     * @throws IgniteCheckedException If session was closed.
      */
-    public GridNioFuture<?> sendSystem(GridNioSession ses,
+    public void sendSystem(GridNioSession ses,
         Message msg,
-        @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) {
+        @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) throws IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
-            skipRecoveryPred.apply(msg));
-
         if (lsnr != null) {
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
+                skipRecoveryPred.apply(msg));
+
             fut.listen(lsnr);
 
             assert !fut.isDone();
-        }
 
-        send0(impl, fut, true);
+            send0(impl, fut, true);
+        }
+        else {
+            SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg);
 
-        return fut;
+            send0(impl, req, true);
+        }
     }
 
     /**
@@ -504,37 +609,69 @@ public class GridNioServer<T> {
     public void resend(GridNioSession ses) {
         assert ses instanceof GridSelectorNioSessionImpl;
 
-        GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+        GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
 
-        if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) {
-            Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures();
+        if (recoveryDesc != null && !recoveryDesc.messagesRequests().isEmpty()) {
+            Deque<SessionWriteRequest> futs = recoveryDesc.messagesRequests();
 
             if (log.isDebugEnabled())
                 log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']');
 
             GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
 
-            GridNioFuture<?> fut0 = futs.iterator().next();
+            SessionWriteRequest fut0 = futs.iterator().next();
 
-            for (GridNioFuture<?> fut : futs) {
+            for (SessionWriteRequest fut : futs) {
                 fut.messageThread(true);
 
-                ((NioOperationFuture)fut).resetSession(ses0);
+                fut.resetSession(ses0);
             }
 
             ses0.resend(futs);
 
             // Wake up worker.
-            clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
+            ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0);
         }
     }
 
     /**
+     * @return Sessions.
+     */
+    public Collection<? extends GridNioSession> sessions() {
+        return sessions;
+    }
+
+    /**
+     * @return Workers.
+     */
+    public List<AbstractNioClientWorker> workers() {
+        return clientWorkers;
+    }
+
+    /**
+     * @param ses Session.
+     * @param from Move from index.
+     * @param to Move to index.
+     */
+    private void moveSession(GridNioSession ses, int from, int to) {
+        assert from >= 0 && from < clientWorkers.size() : from;
+        assert to >= 0 && to < clientWorkers.size() : to;
+        assert from != to;
+
+        GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
+
+        SessionMoveFuture fut = new SessionMoveFuture(ses0, to);
+
+        if (!ses0.offerMove(clientWorkers.get(from), fut))
+            fut.onDone(false);
+    }
+
+    /**
      * @param ses Session.
      * @param op Operation.
      * @return Future for operation.
      */
-    GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) {
+    private GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) {
         assert ses instanceof GridSelectorNioSessionImpl;
         assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ;
 
@@ -546,7 +683,7 @@ public class GridNioServer<T> {
 
         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op);
 
-        clientWorkers.get(impl.selectorIndex()).offer(fut);
+        impl.offerStateChange(fut);
 
         return fut;
     }
@@ -555,6 +692,9 @@ public class GridNioServer<T> {
      *
      */
     public void dumpStats() {
+        U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() +
+            ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
+
         for (int i = 0; i < clientWorkers.size(); i++)
             clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS));
     }
@@ -675,12 +815,35 @@ public class GridNioServer<T> {
      * @param req Request to balance.
      */
     private synchronized void offerBalanced(NioOperationFuture req) {
-        clientWorkers.get(balanceIdx).offer(req);
+        assert req.operation() == NioOperation.REGISTER : req;
+        assert req.socketChannel() != null : req;
+
+        int workers = clientWorkers.size();
+
+        int balanceIdx;
+
+        if (workers > 1) {
+            if (req.accepted()) {
+                balanceIdx = readBalanceIdx;
+
+                readBalanceIdx += 2;
+
+                if (readBalanceIdx >= workers)
+                    readBalanceIdx = 0;
+            }
+            else {
+                balanceIdx = writeBalanceIdx;
 
-        balanceIdx++;
+                writeBalanceIdx += 2;
 
-        if (balanceIdx == clientWorkers.size())
+                if (writeBalanceIdx >= workers)
+                    writeBalanceIdx = 1;
+            }
+        }
+        else
             balanceIdx = 0;
+
+        clientWorkers.get(balanceIdx).offer(req);
     }
 
     /** {@inheritDoc} */
@@ -792,21 +955,30 @@ public class GridNioServer<T> {
 
             while (true) {
                 ByteBuffer buf = ses.removeMeta(BUF_META_KEY);
-                NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+                SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
                 // Check if there were any pending data from previous writes.
                 if (buf == null) {
                     assert req == null;
 
-                    req = (NioOperationFuture<?>)ses.pollFuture();
+                    req = ses.pollFuture();
 
                     if (req == null) {
-                        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                        if (ses.procWrite.get()) {
+                            ses.procWrite.set(false);
+
+                            if (ses.writeQueue().isEmpty()) {
+                                if ((key.interestOps() & SelectionKey.OP_WRITE) != 0)
+                                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                            }
+                            else
+                                ses.procWrite.set(true);
+                        }
 
                         break;
                     }
 
-                    buf = req.message();
+                    buf = (ByteBuffer)req.message();
                 }
 
                 if (!skipWrite) {
@@ -841,10 +1013,15 @@ public class GridNioServer<T> {
                     // Message was successfully written.
                     assert req != null;
 
-                    req.onDone();
+                    req.onMessageWritten();
                 }
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ByteBufferNioClientWorker.class, this, super.toString());
+        }
     }
 
     /**
@@ -909,6 +1086,7 @@ public class GridNioServer<T> {
                 metricsLsnr.onBytesReceived(cnt);
 
             ses.bytesReceived(cnt);
+            onRead(cnt);
 
             readBuf.flip();
 
@@ -921,6 +1099,12 @@ public class GridNioServer<T> {
                     readBuf.compact();
                 else
                     readBuf.clear();
+
+                if (ses.hasSystemMessage() && !ses.procWrite.get()) {
+                    ses.procWrite.set(true);
+
+                    registerWrite(ses);
+                }
             }
             catch (IgniteCheckedException e) {
                 close(ses, e);
@@ -993,16 +1177,29 @@ public class GridNioServer<T> {
                 if (ses.meta(WRITE_BUF_LIMIT) != null)
                     buf.limit((int)ses.meta(WRITE_BUF_LIMIT));
 
-                NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+                SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
                 while (true) {
                     if (req == null) {
-                        req = (NioOperationFuture<?>)ses.pollFuture();
+                        req = systemMessage(ses);
 
-                        if (req == null && buf.position() == 0) {
-                            key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                        if (req == null) {
+                            req = ses.pollFuture();
 
-                            break;
+                            if (req == null && buf.position() == 0) {
+                                if (ses.procWrite.get()) {
+                                    ses.procWrite.set(false);
+
+                                    if (ses.writeQueue().isEmpty()) {
+                                        if ((key.interestOps() & SelectionKey.OP_WRITE) != 0)
+                                            key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                                    }
+                                    else
+                                        ses.procWrite.set(true);
+                                }
+
+                                break;
+                            }
                         }
                     }
 
@@ -1010,7 +1207,7 @@ public class GridNioServer<T> {
                     boolean finished = false;
 
                     if (req != null) {
-                        msg = req.directMessage();
+                        msg = (Message)req.message();
 
                         assert msg != null;
 
@@ -1025,14 +1222,17 @@ public class GridNioServer<T> {
 
                     // Fill up as many messages as possible to write buffer.
                     while (finished) {
-                        req.onDone();
+                        req.onMessageWritten();
 
-                        req = (NioOperationFuture<?>)ses.pollFuture();
+                        req = systemMessage(ses);
+
+                        if (req == null)
+                            req = ses.pollFuture();
 
                         if (req == null)
                             break;
 
-                        msg = req.directMessage();
+                        msg = (Message)req.message();
 
                         assert msg != null;
 
@@ -1129,13 +1329,31 @@ public class GridNioServer<T> {
                 ses.bytesSent(cnt);
 
                 if (!buf.hasRemaining())
-                    queue.remove(buf);
+                    queue.poll();
                 else
                     break;
             }
         }
 
         /**
+         * @param ses Session.
+         * @return System message request.
+         */
+        private SessionWriteRequest systemMessage(GridSelectorNioSessionImpl ses) {
+            if (ses.hasSystemMessage()) {
+                Object msg = ses.systemMessage();
+
+                SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg);
+
+                assert !ses.hasSystemMessage();
+
+                return req;
+            }
+
+            return null;
+        }
+
+        /**
          * Processes write-ready event on the key.
          *
          * @param key Key that is ready to be written.
@@ -1147,7 +1365,7 @@ public class GridNioServer<T> {
 
             GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
             ByteBuffer buf = ses.writeBuffer();
-            NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+            SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
             MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
 
@@ -1161,12 +1379,25 @@ public class GridNioServer<T> {
             }
 
             if (req == null) {
-                req = (NioOperationFuture<?>)ses.pollFuture();
+                req = systemMessage(ses);
 
-                if (req == null && buf.position() == 0) {
-                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                if (req == null) {
+                    req = ses.pollFuture();
 
-                    return;
+                    if (req == null && buf.position() == 0) {
+                        if (ses.procWrite.get()) {
+                            ses.procWrite.set(false);
+
+                            if (ses.writeQueue().isEmpty()) {
+                                if ((key.interestOps() & SelectionKey.OP_WRITE) != 0)
+                                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                            }
+                            else
+                                ses.procWrite.set(true);
+                        }
+
+                        return;
+                    }
                 }
             }
 
@@ -1174,9 +1405,9 @@ public class GridNioServer<T> {
             boolean finished = false;
 
             if (req != null) {
-                msg = req.directMessage();
+                msg = (Message)req.message();
 
-                assert msg != null;
+                assert msg != null : req;
 
                 if (writer != null)
                     writer.setCurrentWriteClass(msg.getClass());
@@ -1189,14 +1420,17 @@ public class GridNioServer<T> {
 
             // Fill up as many messages as possible to write buffer.
             while (finished) {
-                req.onDone();
+                req.onMessageWritten();
 
-                req = (NioOperationFuture<?>)ses.pollFuture();
+                req = systemMessage(ses);
+
+                if (req == null)
+                    req = ses.pollFuture();
 
                 if (req == null)
                     break;
 
-                msg = req.directMessage();
+                msg = (Message)req.message();
 
                 assert msg != null;
 
@@ -1223,6 +1457,7 @@ public class GridNioServer<T> {
                     metricsLsnr.onBytesSent(cnt);
 
                 ses.bytesSent(cnt);
+                onWrite(cnt);
             }
             else {
                 // For test purposes only (skipWrite is set to true in tests only).
@@ -1242,14 +1477,19 @@ public class GridNioServer<T> {
             else
                 buf.clear();
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DirectNioClientWorker.class, this, super.toString());
+        }
     }
 
     /**
      * Thread performing only read operations from the channel.
      */
-    private abstract class AbstractNioClientWorker extends GridWorker {
+    private abstract class AbstractNioClientWorker extends GridWorker implements GridNioWorker {
         /** Queue of change requests on this selector. */
-        private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>();
+        private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs = new ConcurrentLinkedQueue<>();
 
         /** Selector to select read events. */
         private Selector selector;
@@ -1260,6 +1500,25 @@ public class GridNioServer<T> {
         /** Worker index. */
         private final int idx;
 
+        /** */
+        private long bytesRcvd;
+
+        /** */
+        private long bytesSent;
+
+        /** */
+        private volatile long bytesRcvd0;
+
+        /** */
+        private volatile long bytesSent0;
+
+        /** Sessions assigned to this worker. */
+        private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions =
+            new GridConcurrentHashSet<>();
+
+        /** {@code True} if worker has called or is about to call {@code Selector.select()}. */
+        private volatile boolean select;
+
         /**
          * @param idx Index of this worker in server's array.
          * @param gridName Grid name.
@@ -1322,15 +1581,15 @@ public class GridNioServer<T> {
             try {
                 SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
 
-                Class<?> selectorImplClass =
+                Class<?> selectorImplCls =
                     Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader());
 
                 // Ensure the current selector implementation is what we can instrument.
-                if (!selectorImplClass.isAssignableFrom(selector.getClass()))
+                if (!selectorImplCls.isAssignableFrom(selector.getClass()))
                     return;
 
-                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
-                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
+                Field selectedKeysField = selectorImplCls.getDeclaredField("selectedKeys");
+                Field publicSelectedKeysField = selectorImplCls.getDeclaredField("publicSelectedKeys");
 
                 selectedKeysField.setAccessible(true);
                 publicSelectedKeysField.setAccessible(true);
@@ -1357,48 +1616,126 @@ public class GridNioServer<T> {
          *
          * @param req Change request.
          */
-        private void offer(NioOperationFuture req) {
+        public void offer(SessionChangeRequest req) {
             changeReqs.offer(req);
 
+            if (select)
+                selector.wakeup();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void offer(Collection<SessionChangeRequest> reqs) {
+            for (SessionChangeRequest req : reqs)
+                changeReqs.offer(req);
+
             selector.wakeup();
         }
 
+        /** {@inheritDoc} */
+        @Override public List<SessionChangeRequest> clearSessionRequests(GridNioSession ses) {
+            List<SessionChangeRequest> sesReqs = null;
+
+            for (SessionChangeRequest changeReq : changeReqs) {
+                if (changeReq.session() == ses && !(changeReq instanceof SessionMoveFuture)) {
+                    boolean rmv = changeReqs.remove(changeReq);
+
+                    assert rmv : changeReq;
+
+                    if (sesReqs == null)
+                        sesReqs = new ArrayList<>();
+
+                    sesReqs.add(changeReq);
+                }
+            }
+
+            return sesReqs;
+        }
+
         /**
          * Processes read and write events and registration requests.
          *
          * @throws IgniteCheckedException If IOException occurred or thread was unable to add worker to workers pool.
          */
         @SuppressWarnings("unchecked")
-        private void bodyInternal() throws IgniteCheckedException {
+        private void bodyInternal() throws IgniteCheckedException, InterruptedException {
             try {
                 long lastIdleCheck = U.currentTimeMillis();
 
+                mainLoop:
                 while (!closed && selector.isOpen()) {
-                    NioOperationFuture req;
+                    SessionChangeRequest req0;
 
-                    while ((req = changeReqs.poll()) != null) {
-                        switch (req.operation()) {
+                    while ((req0 = changeReqs.poll()) != null) {
+                        switch (req0.operation()) {
                             case REGISTER: {
-                                register(req);
+                                register((NioOperationFuture)req0);
 
                                 break;
                             }
 
-                            case REQUIRE_WRITE: {
-                                //Just register write key.
-                                SelectionKey key = req.session().key();
+                            case MOVE: {
+                                SessionMoveFuture f = (SessionMoveFuture)req0;
 
-                                if (key.isValid()) {
-                                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                                GridSelectorNioSessionImpl ses = f.session();
+
+                                if (idx == f.toIdx) {
+                                    assert f.movedSocketChannel() != null : f;
+
+                                    boolean add = workerSessions.add(ses);
+
+                                    assert add;
 
-                                    // Update timestamp to protected against false write timeout.
-                                    ((GridNioSessionImpl)key.attachment()).bytesSent(0);
+                                    ses.finishMoveSession(this);
+
+                                    if (idx % 2 == 0)
+                                        readerMoveCnt.incrementAndGet();
+                                    else
+                                        writerMoveCnt.incrementAndGet();
+
+                                    SelectionKey key = f.movedSocketChannel().register(selector,
+                                        SelectionKey.OP_READ | SelectionKey.OP_WRITE,
+                                        ses);
+
+                                    ses.key(key);
+
+                                    ses.procWrite.set(true);
+
+                                    f.onDone(true);
+                                }
+                                else {
+                                    assert f.movedSocketChannel() == null : f;
+
+                                    if (workerSessions.remove(ses)) {
+                                        ses.startMoveSession(this);
+
+                                        SelectionKey key = ses.key();
+
+                                        assert key.channel() != null : key;
+
+                                        f.movedSocketChannel((SocketChannel)key.channel());
+
+                                        key.cancel();
+
+                                        clientWorkers.get(f.toIndex()).offer(f);
+                                    }
+                                    else
+                                        f.onDone(false);
                                 }
 
                                 break;
                             }
 
+                            case REQUIRE_WRITE: {
+                                SessionWriteRequest req = (SessionWriteRequest)req0;
+
+                                registerWrite((GridSelectorNioSessionImpl)req.session());
+
+                                break;
+                            }
+
                             case CLOSE: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
                                 if (close(req.session(), null))
                                     req.onDone(true);
                                 else
@@ -1408,6 +1745,8 @@ public class GridNioServer<T> {
                             }
 
                             case PAUSE_READ: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
                                 SelectionKey key = req.session().key();
 
                                 if (key.isValid()) {
@@ -1426,6 +1765,8 @@ public class GridNioServer<T> {
                             }
 
                             case RESUME_READ: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
                                 SelectionKey key = req.session().key();
 
                                 if (key.isValid()) {
@@ -1444,76 +1785,66 @@ public class GridNioServer<T> {
                             }
 
                             case DUMP_STATS: {
-                                StringBuilder sb = new StringBuilder();
+                                NioOperationFuture req = (NioOperationFuture)req0;
 
-                                Set<SelectionKey> keys = selector.keys();
-
-                                sb.append(U.nl())
-                                    .append(">> Selector info [idx=").append(idx)
-                                    .append(", keysCnt=").append(keys.size())
-                                    .append("]").append(U.nl());
-
-                                for (SelectionKey key : keys) {
-                                    GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
-                                    MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-                                    MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
-
-                                    sb.append("    Connection info [")
-                                        .append("rmtAddr=").append(ses.remoteAddress())
-                                        .append(", locAddr=").append(ses.localAddress());
-
-                                    GridNioRecoveryDescriptor desc = ses.recoveryDescriptor();
+                                try {
+                                    dumpStats();
+                                }
+                                finally {
+                                    // Complete the request just in case (none should wait on this future).
+                                    req.onDone(true);
+                                }
+                            }
+                        }
+                    }
 
-                                    if (desc != null) {
-                                        sb.append(", msgsSent=").append(desc.sent())
-                                            .append(", msgsAckedByRmt=").append(desc.acked())
-                                            .append(", msgsRcvd=").append(desc.received())
-                                            .append(", descIdHash=").append(System.identityHashCode(desc));
-                                    }
-                                    else
-                                        sb.append(", recoveryDesc=null");
+                    int res = 0;
 
-                                    sb.append(", bytesRcvd=").append(ses.bytesReceived())
-                                        .append(", bytesSent=").append(ses.bytesSent())
-                                        .append(", opQueueSize=").append(ses.writeQueueSize())
-                                        .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
-                                        .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+                    for (long i = 0; i < selectorSpins && res == 0; i++) {
+                        res = selector.selectNow();
 
-                                    int cnt = 0;
+                        if (res > 0) {
+                            // Walk through the ready keys collection and process network events.
+                            if (selectedKeys == null)
+                                processSelectedKeys(selector.selectedKeys());
+                            else
+                                processSelectedKeysOptimized(selectedKeys.flip());
+                        }
 
-                                    for (GridNioFuture<?> fut : ses.writeQueue()) {
-                                        if (cnt == 0)
-                                            sb.append(",\n opQueue=[").append(fut);
-                                        else
-                                            sb.append(',').append(fut);
+                        if (!changeReqs.isEmpty())
+                            continue mainLoop;
 
-                                        if (++cnt == 5) {
-                                            sb.append(']');
+                        // Just in case we do busy selects.
+                        long now = U.currentTimeMillis();
 
-                                            break;
-                                        }
-                                    }
+                        if (now - lastIdleCheck > 2000) {
+                            lastIdleCheck = now;
 
+                            checkIdle(selector.keys());
+                        }
 
-                                    sb.append("]").append(U.nl());
-                                }
+                        if (isCancelled())
+                            return;
+                    }
 
-                                U.warn(log, sb.toString());
+                    // Falling to blocking select.
+                    select = true;
 
-                                // Complete the request just in case (none should wait on this future).
-                                req.onDone(true);
-                            }
+                    try {
+                        if (!changeReqs.isEmpty())
+                            continue;
+
+                        // Wake up every 2 seconds to check if closed.
+                        if (selector.select(2000) > 0) {
+                            // Walk through the ready keys collection and process network events.
+                            if (selectedKeys == null)
+                                processSelectedKeys(selector.selectedKeys());
+                            else
+                                processSelectedKeysOptimized(selectedKeys.flip());
                         }
                     }
-
-                    // Wake up every 2 seconds to check if closed.
-                    if (selector.select(2000) > 0) {
-                        // Walk through the ready keys collection and process network events.
-                        if (selectedKeys == null)
-                            processSelectedKeys(selector.selectedKeys());
-                        else
-                            processSelectedKeysOptimized(selectedKeys.flip());
+                    finally {
+                        select = false;
                     }
 
                     long now = U.currentTimeMillis();
@@ -1554,6 +1885,98 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param ses Session.
+         */
+        public final void registerWrite(GridSelectorNioSessionImpl ses) {
+            SelectionKey key = ses.key();
+
+            if (key.isValid()) {
+                if ((key.interestOps() & SelectionKey.OP_WRITE) == 0)
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+
+                // Update timestamp to protected against false write timeout.
+                ses.bytesSent(0);
+            }
+        }
+
+        /**
+         *
+         */
+        private void dumpStats() {
+            StringBuilder sb = new StringBuilder();
+
+            Set<SelectionKey> keys = selector.keys();
+
+            sb.append(U.nl())
+                .append(">> Selector info [idx=").append(idx)
+                .append(", keysCnt=").append(keys.size())
+                .append(", bytesRcvd=").append(bytesRcvd)
+                .append(", bytesRcvd0=").append(bytesRcvd0)
+                .append(", bytesSent=").append(bytesSent)
+                .append(", bytesSent0=").append(bytesSent0)
+                .append("]").append(U.nl());
+
+            for (SelectionKey key : keys) {
+                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+
+                MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+                MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+
+                sb.append("    Connection info [")
+                    .append("in=").append(ses.accepted())
+                    .append(", rmtAddr=").append(ses.remoteAddress())
+                    .append(", locAddr=").append(ses.localAddress());
+
+                GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+
+                if (outDesc != null) {
+                    sb.append(", msgsSent=").append(outDesc.sent())
+                        .append(", msgsAckedByRmt=").append(outDesc.acked())
+                        .append(", descIdHash=").append(System.identityHashCode(outDesc));
+                }
+                else
+                    sb.append(", outRecoveryDesc=null");
+
+                GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+
+                if (inDesc != null) {
+                    sb.append(", msgsRcvd=").append(inDesc.received())
+                        .append(", lastAcked=").append(inDesc.lastAcknowledged())
+                        .append(", descIdHash=").append(System.identityHashCode(inDesc));
+                }
+                else
+                    sb.append(", inRecoveryDesc=null");
+
+                sb.append(", bytesRcvd=").append(ses.bytesReceived())
+                    .append(", bytesRcvd0=").append(ses.bytesReceived0())
+                    .append(", bytesSent=").append(ses.bytesSent())
+                    .append(", bytesSent0=").append(ses.bytesSent0())
+                    .append(", opQueueSize=").append(ses.writeQueueSize())
+                    .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
+                    .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+
+                int cnt = 0;
+
+                for (SessionWriteRequest req : ses.writeQueue()) {
+                    if (cnt == 0)
+                        sb.append(",\n opQueue=[").append(req);
+                    else
+                        sb.append(',').append(req);
+
+                    if (++cnt == 5) {
+                        sb.append(']');
+
+                        break;
+                    }
+                }
+
+                sb.append("]").append(U.nl());
+            }
+
+            U.warn(log, sb.toString());
+        }
+
+        /**
          * Processes keys selected by a selector.
          *
          * @param keys Selected keys.
@@ -1671,7 +2094,9 @@ public class GridNioServer<T> {
 
                     long idleTimeout0 = idleTimeout;
 
-                    if (!opWrite && now - ses.lastReceiveTime() > idleTimeout0 && now - ses.lastSendScheduleTime() > idleTimeout0) {
+                    if (!opWrite &&
+                        now - ses.lastReceiveTime() > idleTimeout0 &&
+                        now - ses.lastSendScheduleTime() > idleTimeout0) {
                         filterChain.onSessionIdleTimeout(ses);
 
                         // Update timestamp to avoid multiple notifications within one timeout interval.
@@ -1715,7 +2140,7 @@ public class GridNioServer<T> {
 
                 final GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl(
                     log,
-                    idx,
+                    this,
                     filterChain,
                     (InetSocketAddress)sockCh.getLocalAddress(),
                     (InetSocketAddress)sockCh.getRemoteAddress(),
@@ -1739,6 +2164,7 @@ public class GridNioServer<T> {
                     resend(ses);
 
                 sessions.add(ses);
+                workerSessions.add(ses);
 
                 try {
                     filterChain.onSessionOpened(ses);
@@ -1764,7 +2190,7 @@ public class GridNioServer<T> {
         }
 
         /**
-         * Closes the ses and all associated resources, then notifies the listener.
+         * Closes the session and all associated resources, then notifies the listener.
          *
          * @param ses Session to be closed.
          * @param e Exception to be passed to the listener, if any.
@@ -1781,12 +2207,10 @@ public class GridNioServer<T> {
             }
 
             sessions.remove(ses);
+            workerSessions.remove(ses);
 
             SelectionKey key = ses.key();
 
-            // Shutdown input and output so that remote client will see correct socket close.
-            Socket sock = ((SocketChannel)key.channel()).socket();
-
             if (ses.setClosed()) {
                 ses.onClosed();
 
@@ -1798,6 +2222,9 @@ public class GridNioServer<T> {
                         ((DirectBuffer)ses.readBuffer()).cleaner().clean();
                 }
 
+                // Shutdown input and output so that remote client will see correct socket close.
+                Socket sock = ((SocketChannel)key.channel()).socket();
+
                 try {
                     try {
                         sock.shutdownInput();
@@ -1824,28 +2251,35 @@ public class GridNioServer<T> {
                 ses.removeMeta(BUF_META_KEY);
 
                 // Since ses is in closed state, no write requests will be added.
-                NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal());
+                SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
+
+                GridNioRecoveryDescriptor outRecovery = ses.outRecoveryDescriptor();
+                GridNioRecoveryDescriptor inRecovery = ses.inRecoveryDescriptor();
 
-                GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+                IOException err = new IOException("Failed to send message (connection was closed): " + ses);
 
-                if (recovery != null) {
+                if (outRecovery != null || inRecovery != null) {
                     try {
                         // Poll will update recovery data.
-                        while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) {
-                            if (fut.skipRecovery())
-                                fut.connectionClosed();
+                        while ((req = ses.pollFuture()) != null) {
+                            if (req.skipRecovery())
+                                req.onError(err);
                         }
                     }
                     finally {
-                        recovery.release();
+                        if (outRecovery != null)
+                            outRecovery.release();
+
+                        if (inRecovery != null && inRecovery != outRecovery)
+                            inRecovery.release();
                     }
                 }
                 else {
-                    if (fut != null)
-                        fut.connectionClosed();
+                    if (req != null)
+                        req.onError(err);
 
-                    while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null)
-                        fut.connectionClosed();
+                    while ((req = ses.pollFuture()) != null)
+                        req.onError(err);
                 }
 
                 try {
@@ -1876,12 +2310,44 @@ public class GridNioServer<T> {
          * @throws IOException If write failed.
          */
         protected abstract void processWrite(SelectionKey key) throws IOException;
-    }
 
-    /**
-     * Gets outbound messages queue size.
-     *
-     * @return Write queue size.
+        /**
+         * @param cnt
+         */
+        final void onRead(int cnt) {
+            bytesRcvd += cnt;
+            bytesRcvd0 += cnt;
+        }
+
+        /**
+         * @param cnt
+         */
+        final void onWrite(int cnt) {
+            bytesSent += cnt;
+            bytesSent0 += cnt;
+        }
+
+        /**
+         *
+         */
+        final void reset0() {
+            bytesSent0 = 0;
+            bytesRcvd0 = 0;
+
+            for (GridSelectorNioSessionImpl ses : workerSessions)
+                ses.reset0();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AbstractNioClientWorker.class, this, super.toString());
+        }
+    }
+
+    /**
+     * Gets outbound messages queue size.
+     *
+     * @return Write queue size.
      */
     public int outboundMessagesQueueSize() {
         int res = 0;
@@ -1952,6 +2418,9 @@ public class GridNioServer<T> {
                     if (selector.select(2000) > 0)
                         // Walk through the ready keys collection and process date requests.
                         processSelectedKeys(selector.selectedKeys());
+
+                    if (balancer != null)
+                        balancer.run();
                 }
             }
             // Ignore this exception as thread interruption is equal to 'close' call.
@@ -2048,10 +2517,13 @@ public class GridNioServer<T> {
     /**
      * Asynchronous operation that may be requested on selector.
      */
-    private enum NioOperation {
+    enum NioOperation {
         /** Register read key selection. */
         REGISTER,
 
+        /** Move session between workers. */
+        MOVE,
+
         /** Register write key selection. */
         REQUIRE_WRITE,
 
@@ -2069,9 +2541,193 @@ public class GridNioServer<T> {
     }
 
     /**
+     *
+     */
+    private static final class WriteRequestSystemImpl implements SessionWriteRequest, SessionChangeRequest {
+        /** */
+        private final Object msg;
+
+        /** */
+        private final GridNioSession ses;
+
+        /**
+         * @param ses Session.
+         * @param msg Message.
+         */
+        WriteRequestSystemImpl(GridNioSession ses, Object msg) {
+            this.ses = ses;
+            this.msg = msg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void messageThread(boolean msgThread) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean messageThread() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean skipRecovery() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAckReceived() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteInClosure<IgniteException> ackClosure() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onError(Exception e) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object message() {
+            return msg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageWritten() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void resetSession(GridNioSession ses) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioSession session() {
+            return ses;
+        }
+
+        /** {@inheritDoc} */
+        @Override public NioOperation operation() {
+            return NioOperation.REQUIRE_WRITE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(WriteRequestSystemImpl.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static final class WriteRequestImpl implements SessionWriteRequest, SessionChangeRequest {
+        /** */
+        private GridNioSession ses;
+
+        /** */
+        private final Object msg;
+
+        /** */
+        private boolean msgThread;
+
+        /** */
+        private final boolean skipRecovery;
+
+        /** */
+        private IgniteInClosure<IgniteException> ackC;
+
+        /**
+         * @param ses Session.
+         * @param msg Message.
+         * @param skipRecovery Skip recovery flag.
+         */
+        WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery) {
+            this.ses = ses;
+            this.msg = msg;
+            this.skipRecovery = skipRecovery;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void messageThread(boolean msgThread) {
+            this.msgThread = msgThread;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean messageThread() {
+            return msgThread;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean skipRecovery() {
+            return skipRecovery;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
+            ackC = c;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAckReceived() {
+            assert msg instanceof Message;
+
+            ((Message)msg).onAckReceived();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteInClosure<IgniteException> ackClosure() {
+            return ackC;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onError(Exception e) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object message() {
+            return msg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageWritten() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void resetSession(GridNioSession ses) {
+            this.ses = ses;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioSession session() {
+            return ses;
+        }
+
+        /** {@inheritDoc} */
+        @Override public NioOperation operation() {
+            return NioOperation.REQUIRE_WRITE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(WriteRequestImpl.class, this);
+        }
+    }
+
+    /**
      * Class for requesting write and session close operations.
      */
-    private static class NioOperationFuture<R> extends GridNioFutureImpl<R> {
+    private static class NioOperationFuture<R> extends GridNioFutureImpl<R> implements SessionWriteRequest,
+        SessionChangeRequest {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -2087,11 +2743,7 @@ public class GridNioServer<T> {
         private NioOperation op;
 
         /** Message. */
-        @GridToStringExclude
-        private ByteBuffer msg;
-
-        /** Direct message. */
-        private Message commMsg;
+        private Object msg;
 
         /** */
         @GridToStringExclude
@@ -2153,8 +2805,7 @@ public class GridNioServer<T> {
          * @param op Requested operation.
          * @param msg Message.
          */
-        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
-            ByteBuffer msg) {
+        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg) {
             assert ses != null;
             assert op != null;
             assert op != NioOperation.REGISTER;
@@ -2182,51 +2833,36 @@ public class GridNioServer<T> {
 
             this.ses = ses;
             this.op = op;
-            this.commMsg = commMsg;
+            this.msg = commMsg;
             this.skipRecovery = skipRecovery;
         }
 
-        /**
-         * @return Requested change operation.
-         */
-        private NioOperation operation() {
+        /** {@inheritDoc} */
+        public NioOperation operation() {
             return op;
         }
 
-        /**
-         * @return Message.
-         */
-        private ByteBuffer message() {
+        /** {@inheritDoc} */
+        public Object message() {
             return msg;
         }
 
-        /**
-         * @return Direct message.
-         */
-        private Message directMessage() {
-            return commMsg;
-        }
-
-        /**
-         * @param ses New session instance.
-         */
-        private void resetSession(GridSelectorNioSessionImpl ses) {
-            assert commMsg != null;
+        /** {@inheritDoc} */
+        public void resetSession(GridNioSession ses) {
+            assert msg instanceof Message : msg;
 
-            this.ses = ses;
+            this.ses = (GridSelectorNioSessionImpl)ses;
         }
 
         /**
          * @return Socket channel for register request.
          */
-        private SocketChannel socketChannel() {
+        SocketChannel socketChannel() {
             return sockCh;
         }
 
-        /**
-         * @return Session for this change request.
-         */
-        private GridSelectorNioSessionImpl session() {
+        /** {@inheritDoc} */
+        public GridSelectorNioSessionImpl session() {
             return ses;
         }
 
@@ -2244,21 +2880,21 @@ public class GridNioServer<T> {
             return meta;
         }
 
-        /**
-         * Applicable to write futures only. Fails future with corresponding IOException.
-         */
-        private void connectionClosed() {
-            assert op == NioOperation.REQUIRE_WRITE;
-            assert ses != null;
-
-            onDone(new IOException("Failed to send message (connection was closed): " + ses));
+        /** {@inheritDoc} */
+        @Override public void onError(Exception e) {
+            onDone(e);
         }
 
         /** {@inheritDoc} */
         @Override public void onAckReceived() {
-            assert commMsg != null;
+            assert msg instanceof Message : msg;
 
-            commMsg.onAckReceived();
+            ((Message)msg).onAckReceived();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageWritten() {
+            onDone();
         }
 
         /** {@inheritDoc} */
@@ -2273,6 +2909,59 @@ public class GridNioServer<T> {
     }
 
     /**
+     *
+     */
+    private static class SessionMoveFuture extends NioOperationFuture<Boolean> {
+        /** */
+        private final int toIdx;
+
+        /** */
+        @GridToStringExclude
+        private SocketChannel movedSockCh;
+
+        /**
+         * @param ses Session.
+         * @param toIdx Target worker index.
+         */
+        SessionMoveFuture(
+            GridSelectorNioSessionImpl ses,
+            int toIdx
+        ) {
+            super(ses, NioOperation.MOVE);
+
+            this.toIdx = toIdx;
+        }
+
+        /**
+         * @return Target worker index.
+         */
+        int toIndex() {
+            return toIdx;
+        }
+
+        /**
+         * @return Moved session socket channel.
+         */
+        SocketChannel movedSocketChannel() {
+            return movedSockCh;
+        }
+
+        /**
+         * @param movedSockCh Moved session socket channel.
+         */
+        void movedSocketChannel(SocketChannel movedSockCh) {
+            assert movedSockCh != null;
+
+            this.movedSockCh = movedSockCh;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SessionMoveFuture.class, this, super.toString());
+        }
+    }
+
+    /**
      * Filter forwarding messages from chain's head to this server.
      */
     private class HeadFilter extends GridNioFilterAdapter {
@@ -2302,7 +2991,7 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
             if (directMode) {
                 boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
 
@@ -2313,18 +3002,18 @@ public class GridNioServer<T> {
 
                     queue.offer((ByteBuffer)msg);
 
-                    SelectionKey key = ((GridSelectorNioSessionImpl)ses).key();
+                    GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
 
-                    if (key.isValid())
-                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    if (!ses0.procWrite.get() && ses0.procWrite.compareAndSet(false, true))
+                        ses0.worker().registerWrite(ses0);
 
                     return null;
                 }
                 else
-                    return send(ses, (Message)msg);
+                    return send(ses, (Message)msg, fut);
             }
             else
-                return send(ses, (ByteBuffer)msg);
+                return send(ses, (ByteBuffer)msg, fut);
         }
 
         /** {@inheritDoc} */
@@ -2429,6 +3118,15 @@ public class GridNioServer<T> {
         /** Message queue size listener. */
         private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
 
+        /** Name for threads identification. */
+        private String srvName;
+
+        /** */
+        private long selectorSpins;
+
+        /** NIO sessions balancing flag. */
+        private boolean balancing;
+
         /**
          * Finishes building the instance.
          *
@@ -2442,6 +3140,8 @@ public class GridNioServer<T> {
                 log,
                 selectorCnt,
                 gridName,
+                srvName,
+                selectorSpins,
                 tcpNoDelay,
                 directBuf,
                 byteOrder,
@@ -2455,6 +3155,7 @@ public class GridNioServer<T> {
                 writerFactory,
                 skipRecoveryPred,
                 msgQueueLsnr,
+                balancing,
                 filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
             );
 
@@ -2468,6 +3169,16 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param balancing NIO sessions balancing flag.
+         * @return This for chaining.
+         */
+        public Builder<T> balancing(boolean balancing) {
+            this.balancing = balancing;
+
+            return this;
+        }
+
+        /**
          * @param addr Local address.
          * @return This for chaining.
          */
@@ -2519,6 +3230,28 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param srvName Logical server name for threads identification.
+         * @return This for chaining.
+         */
+        public Builder<T> serverName(@Nullable String srvName) {
+            this.srvName = srvName;
+
+            return this;
+        }
+
+        /**
+         * @param selectorSpins Defines how many non-blocking {@code selector.selectNow()} should be made before
+         *      falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+         *      Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+         * @return This for chaining.
+         */
+        public Builder<T> selectorSpins(long selectorSpins) {
+            this.selectorSpins = selectorSpins;
+
+            return this;
+        }
+
+        /**
          * @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets.
          * @return This for chaining.
          */
@@ -2678,4 +3411,225 @@ public class GridNioServer<T> {
             return this;
         }
     }
+
+    /**
+     *
+     */
+    private class SizeBasedBalancer implements IgniteRunnable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private long lastBalance;
+
+        /** */
+        private final long balancePeriod;
+
+        /**
+         * @param balancePeriod Period.
+         */
+        SizeBasedBalancer(long balancePeriod) {
+            this.balancePeriod = balancePeriod;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            long now = U.currentTimeMillis();
+
+            if (lastBalance + balancePeriod < now) {
+                lastBalance = now;
+
+                long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1;
+                int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1;
+
+                for (int i = 0; i < clientWorkers.size(); i++) {
+                    GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+                    int sesCnt = worker.workerSessions.size();
+
+                    if (i % 2 == 0) {
+                        // Reader.
+                        long bytesRcvd0 = worker.bytesRcvd0;
+
+                        if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0 && sesCnt > 1) {
+                            maxRcvd0 = bytesRcvd0;
+                            maxRcvdIdx = i;
+                        }
+
+                        if (minRcvd0 == -1 || bytesRcvd0 < minRcvd0) {
+                            minRcvd0 = bytesRcvd0;
+                            minRcvdIdx = i;
+                        }
+                    }
+                    else {
+                        // Writer.
+                        long bytesSent0 = worker.bytesSent0;
+
+                        if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0 > 0 && sesCnt > 1) {
+                            maxSent0 = bytesSent0;
+                            maxSentIdx = i;
+                        }
+
+                        if (minSent0 == -1 || bytesSent0 < minSent0) {
+                            minSent0 = bytesSent0;
+                            minSentIdx = i;
+                        }
+                    }
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" + minSentIdx +
+                        ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx +
+                        ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx +
+                        ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']');
+
+                if (maxSent0 != -1 && minSent0 != -1) {
+                    GridSelectorNioSessionImpl ses = null;
+
+                    long sentDiff = maxSent0 - minSent0;
+                    long delta = sentDiff;
+                    double threshold = sentDiff * 0.9;
+
+                    GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+                        clientWorkers.get(maxSentIdx).workerSessions;
+
+                    for (GridSelectorNioSessionImpl ses0 : sessions) {
+                        long bytesSent0 = ses0.bytesSent0();
+
+                        if (bytesSent0 < threshold &&
+                            (ses == null || delta > U.safeAbs(bytesSent0 - sentDiff / 2))) {
+                            ses = ses0;
+                            delta = U.safeAbs(bytesSent0 - sentDiff / 2);
+                        }
+                    }
+
+                    if (ses != null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Will move session to less loaded writer [ses=" + ses +
+                                ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
+
+                        moveSession(ses, maxSentIdx, minSentIdx);
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Unable to find session to move for writers.");
+                    }
+                }
+
+                if (maxRcvd0 != -1 && minRcvd0 != -1) {
+                    GridSelectorNioSessionImpl ses = null;
+
+                    long rcvdDiff = maxRcvd0 - minRcvd0;
+                    long delta = rcvdDiff;
+                    double threshold = rcvdDiff * 0.9;
+
+                    GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+                        clientWorkers.get(maxRcvdIdx).workerSessions;
+
+                    for (GridSelectorNioSessionImpl ses0 : sessions) {
+                        long bytesRcvd0 = ses0.bytesReceived0();
+
+                        if (bytesRcvd0 < threshold &&
+                            (ses == null || delta > U.safeAbs(bytesRcvd0 - rcvdDiff / 2))) {
+                            ses = ses0;
+                            delta = U.safeAbs(bytesRcvd0 - rcvdDiff / 2);
+                        }
+                    }
+
+                    if (ses != null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Will move session to less loaded reader [ses=" + ses +
+                                ", from=" + maxRcvdIdx + ", to=" + minRcvdIdx + ']');
+
+                        moveSession(ses, maxRcvdIdx, minRcvdIdx);
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Unable to find session to move for readers.");
+                    }
+                }
+
+                for (int i = 0; i < clientWorkers.size(); i++) {
+                    GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+                    worker.reset0();
+                }
+            }
+        }
+    }
+
+    /**
+     * For tests only.
+     */
+    @SuppressWarnings("unchecked")
+    private class RandomBalancer implements IgniteRunnable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            int w1 = rnd.nextInt(clientWorkers.size());
+
+            if (clientWorkers.get(w1).workerSessions.isEmpty())
+                return;
+
+            int w2 = rnd.nextInt(clientWorkers.size());
+
+            while (w2 == w1)
+                w2 = rnd.nextInt(clientWorkers.size());
+
+            GridNioSession ses = randomSession(clientWorkers.get(w1));
+
+            if (ses != null) {
+                log.info("Move session [from=" + w1 +
+                    ", to=" + w2 +
+                    ", ses=" + ses + ']');
+
+                moveSession(ses, w1, w2);
+            }
+        }
+
+        /**
+         * @param worker Worker.
+         * @return NIO session.
+         */
+        private GridNioSession randomSession(GridNioServer.AbstractNioClientWorker worker) {
+            Collection<GridNioSession> sessions = worker.workerSessions;
+
+            int size = sessions.size();
+
+            if (size == 0)
+                return null;
+
+            int idx = ThreadLocalRandom.current().nextInt(size);
+
+            Iterator<GridNioSession> it = sessions.iterator();
+
+            int cnt = 0;
+
+            while (it.hasNext()) {
+                GridNioSession ses = it.next();
+
+                if (cnt == idx)
+                    return ses;
+            }
+
+            return null;
+        }
+
+    }
+
+    /**
+     *
+     */
+    interface SessionChangeRequest {
+        GridNioSession session();
+
+        /**
+         * @return Requested change operation.
+         */
+        NioOperation operation();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index e4a7225..c1b60ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.nio;
 
 import java.net.InetSocketAddress;
+import org.apache.ignite.IgniteCheckedException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -105,6 +106,11 @@ public interface GridNioSession {
     public GridNioFuture<?> send(Object msg);
 
     /**
+     * @param msg Message to be sent.
+     */
+    public void sendNoFuture(Object msg) throws IgniteCheckedException;
+
+    /**
      * Gets metadata associated with specified key.
      *
      * @param key Key to look up.
@@ -158,10 +164,25 @@ public interface GridNioSession {
     /**
      * @param recoveryDesc Recovery descriptor.
      */
-    public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+    public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+
+    /**
+     * @param recoveryDesc Recovery descriptor.
+     */
+    public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
 
     /**
      * @return Recovery descriptor if recovery is supported, {@code null otherwise.}
      */
-    @Nullable public GridNioRecoveryDescriptor recoveryDescriptor();
+    @Nullable public GridNioRecoveryDescriptor outRecoveryDescriptor();
+
+    /**
+     * @return Recovery descriptor if recovery is supported, {@code null otherwise.}
+     */
+    @Nullable public GridNioRecoveryDescriptor inRecoveryDescriptor();
+
+    /**
+     * @param msg System message to send.
+     */
+    public void systemMessage(Object msg);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 0bcfe64..7424531 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -51,6 +51,12 @@ public class GridNioSessionImpl implements GridNioSession {
     /** Received bytes counter. */
     private volatile long bytesRcvd;
 
+    /** Sent bytes since last NIO sessions balancing. */
+    private volatile long bytesSent0;
+
+    /** Received bytes since last NIO sessions balancing. */
+    private volatile long bytesRcvd0;
+
     /** Last send schedule timestamp. */
     private volatile long sndSchedTime;
 
@@ -99,7 +105,7 @@ public class GridNioSessionImpl implements GridNioSession {
         try {
             resetSendScheduleTime();
 
-            return chain().onSessionWrite(this, msg);
+            return chain().onSessionWrite(this, msg, true);
         }
         catch (IgniteCheckedException e) {
             close();
@@ -109,6 +115,18 @@ public class GridNioSessionImpl implements GridNioSession {
     }
 
     /** {@inheritDoc} */
+    @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+        try {
+            chain().onSessionWrite(this, msg, false);
+        }
+        catch (IgniteCheckedException e) {
+            close();
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public GridNioFuture<?> resumeReads() {
         try {
             return chain().onResumeReads(this);
@@ -163,6 +181,28 @@ public class GridNioSessionImpl implements GridNioSession {
         return bytesRcvd;
     }
 
+    /**
+     * @return Sent bytes since last NIO sessions balancing.
+     */
+    public long bytesSent0() {
+        return bytesSent0;
+    }
+
+    /**
+     * @return Received bytes since last NIO sessions balancing.
+     */
+    public long bytesReceived0() {
+        return bytesRcvd0;
+    }
+
+    /**
+     *
+     */
+    public void reset0() {
+        bytesSent0 = 0;
+        bytesRcvd0 = 0;
+    }
+
     /** {@inheritDoc} */
     @Override public long createTime() {
         return createTime;
@@ -240,6 +280,7 @@ public class GridNioSessionImpl implements GridNioSession {
      */
     public void bytesSent(int cnt) {
         bytesSent += cnt;
+        bytesSent0 += cnt;
 
         lastSndTime = U.currentTimeMillis();
     }
@@ -253,6 +294,7 @@ public class GridNioSessionImpl implements GridNioSession {
      */
     public void bytesReceived(int cnt) {
         bytesRcvd += cnt;
+        bytesRcvd0 += cnt;
 
         lastRcvTime = U.currentTimeMillis();
     }
@@ -296,17 +338,32 @@ public class GridNioSessionImpl implements GridNioSession {
     }
 
     /** {@inheritDoc} */
-    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+    @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+    @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
         return null;
     }
 
     /** {@inheritDoc} */
+    @Override public void systemMessage(Object msg) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNioSessionImpl.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
new file mode 100644
index 0000000..62985ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import java.util.Collection;
+import java.util.List;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+interface GridNioWorker {
+    /**
+     * @param req Change request.
+     */
+    public void offer(GridNioServer.SessionChangeRequest req);
+
+    /**
+     * @param reqs Change requests.
+     */
+    public void offer(Collection<GridNioServer.SessionChangeRequest> reqs);
+
+    /**
+     * @param ses Session.
+     * @return Session state change requests.
+     */
+    @Nullable public List<GridNioServer.SessionChangeRequest> clearSessionRequests(GridNioSession ses);
+
+    /**
+     * @param ses Session to register write interest for.
+     */
+    public void registerWrite(GridSelectorNioSessionImpl ses);
+}