You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:22 UTC

[30/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index 0000000,283f266..350d0b1
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@@ -1,0 -1,84 +1,78 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.nio;
+ 
+ import org.apache.ignite.*;
++import org.apache.ignite.plugin.extensions.communication.*;
+ import org.apache.ignite.spi.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.nio.*;
 -import java.util.*;
+ 
+ /**
+  * Parser for direct messages.
+  */
+ public class GridDirectParser implements GridNioParser {
+     /** Message metadata key. */
+     private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+ 
 -    /** Message reader. */
 -    private final GridNioMessageReader msgReader;
 -
+     /** */
+     private IgniteSpiAdapter spi;
+ 
+     /** */
 -    private GridTcpMessageFactory msgFactory;
++    private MessageFactory msgFactory;
+ 
+     /**
 -     * @param msgReader Message reader.
+      * @param spi Spi.
+      */
 -    public GridDirectParser(GridNioMessageReader msgReader, IgniteSpiAdapter spi) {
 -        this.msgReader = msgReader;
++    public GridDirectParser(IgniteSpiAdapter spi) {
+         this.spi = spi;
+     }
+ 
+     /** {@inheritDoc} */
+     @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+         if (msgFactory == null)
+             msgFactory = spi.getSpiContext().messageFactory();
+ 
+         GridTcpCommunicationMessageAdapter msg = ses.removeMeta(MSG_META_KEY);
 -        UUID nodeId = ses.meta(GridNioServer.DIFF_VER_NODE_ID_META_KEY);
+ 
+         if (msg == null && buf.hasRemaining())
+             msg = msgFactory.create(buf.get());
+ 
+         boolean finished = false;
+ 
+         if (buf.hasRemaining())
 -            finished = msgReader.read(nodeId, msg, buf);
++            finished = msg.readFrom(buf);
+ 
+         if (finished)
+             return msg;
+         else {
+             ses.addMeta(MSG_META_KEY, msg);
+ 
+             return null;
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+         // No encoding needed for direct messages.
+         throw new UnsupportedEncodingException();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 0000000,5a137a9..0e80d56
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@@ -1,0 -1,2305 +1,2276 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.nio;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.thread.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.nio.ssl.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ import sun.nio.ch.*;
+ 
+ import java.io.*;
+ import java.net.*;
+ import java.nio.*;
+ import java.nio.channels.*;
+ import java.nio.channels.spi.*;
+ import java.util.*;
+ import java.util.Map.*;
+ 
+ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*;
+ 
+ /**
+  * TCP NIO server. Due to asynchronous nature of connections processing
+  * network events such as client connection, disconnection and message receiving are passed to
+  * the server listener. Once client connected, an associated {@link GridNioSession} object is
+  * created and can be used in communication.
+  * <p>
+  * This implementation supports several selectors and several reading threads.
+  *
+  * @param <T> Message type.
+  *
+  */
+ public class GridNioServer<T> {
+     /** Default session write timeout. */
+     public static final int DFLT_SES_WRITE_TIMEOUT = 5000;
+ 
+     /** Default send queue limit. */
+     public static final int DFLT_SEND_QUEUE_LIMIT = 1024;
+ 
+     /** Time, which server will wait before retry operation. */
+     private static final long ERR_WAIT_TIME = 2000;
+ 
+     /** Buffer metadata key. */
+     private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+ 
 -    /** SSL sysmtem data buffer metadata key. */
++    /** SSL system data buffer metadata key. */
+     private static final int BUF_SSL_SYSTEM_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+ 
 -    /** Node ID meta key (set only if versions are different). */
 -    public static final int DIFF_VER_NODE_ID_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 -
+     /** Accept worker thread. */
+     @GridToStringExclude
+     private final IgniteThread acceptThread;
+ 
+     /** Read worker threads. */
+     private final IgniteThread[] clientThreads;
+ 
+     /** Read workers. */
+     private final List<AbstractNioClientWorker> clientWorkers;
+ 
+     /** Filter chain to use. */
+     private final GridNioFilterChain<T> filterChain;
+ 
+     /** Logger. */
+     @GridToStringExclude
+     private final IgniteLogger log;
+ 
+     /** Closed flag. */
+     private volatile boolean closed;
+ 
+     /** Flag indicating if this server should use direct buffers. */
+     private final boolean directBuf;
+ 
+     /** Index to select which thread will serve next socket channel. Using round-robin balancing. */
+     @GridToStringExclude
+     private int balanceIdx;
+ 
+     /** Tcp no delay flag. */
+     private final boolean tcpNoDelay;
+ 
+     /** Socket send buffer. */
+     private final int sockSndBuf;
+ 
+     /** Socket receive buffer. */
+     private final int sockRcvBuf;
+ 
+     /** Write timeout */
+     private volatile long writeTimeout = DFLT_SES_WRITE_TIMEOUT;
+ 
+     /** Idle timeout. */
+     private volatile long idleTimeout = IgniteConfiguration.DFLT_REST_IDLE_TIMEOUT;
+ 
+     /** For test purposes only. */
+     @SuppressWarnings("UnusedDeclaration")
+     private boolean skipWrite;
+ 
+     /** Local address. */
+     private final InetSocketAddress locAddr;
+ 
+     /** Order. */
+     private final ByteOrder order;
+ 
+     /** Send queue limit. */
+     private final int sndQueueLimit;
+ 
+     /** Whether direct mode is used. */
+     private final boolean directMode;
+ 
+     /** Metrics listener. */
+     private final GridNioMetricsListener metricsLsnr;
+ 
 -    /** Message writer. */
 -    private final GridNioMessageWriter msgWriter;
 -
+     /** Sessions. */
+     private final GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = new GridConcurrentHashSet<>();
+ 
+     /** */
+     private GridNioSslFilter sslFilter;
+ 
+     /** Static initializer ensures single-threaded execution of workaround. */
+     static {
+         // This is a workaround for JDK bug (NPE in Selector.open()).
+         // http://bugs.sun.com/view_bug.do?bug_id=6427854
+         try {
+             Selector.open().close();
+         }
+         catch (IOException ignored) {
+         }
+     }
+ 
+     /**
+      * @param addr Address.
+      * @param port Port.
+      * @param log Log.
+      * @param selectorCnt Count of selectors and selecting threads.
+      * @param gridName Grid name.
+      * @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets.
+      * @param directBuf Direct buffer flag.
+      * @param order Byte order.
+      * @param lsnr Listener.
+      * @param sockSndBuf Socket send buffer.
+      * @param sockRcvBuf Socket receive buffer.
+      * @param sndQueueLimit Send queue limit.
+      * @param directMode Whether direct mode is used.
+      * @param daemon Daemon flag to create threads.
+      * @param metricsLsnr Metrics listener.
 -     * @param msgWriter Message writer.
+      * @param filters Filters for this server.
+      * @throws IgniteCheckedException If failed.
+      */
+     private GridNioServer(
+         InetAddress addr,
+         int port,
+         IgniteLogger log,
+         int selectorCnt,
+         @Nullable String gridName,
+         boolean tcpNoDelay,
+         boolean directBuf,
+         ByteOrder order,
+         GridNioServerListener<T> lsnr,
+         int sockSndBuf,
+         int sockRcvBuf,
+         int sndQueueLimit,
+         boolean directMode,
+         boolean daemon,
+         GridNioMetricsListener metricsLsnr,
 -        GridNioMessageWriter msgWriter,
+         GridNioFilter... filters
+     ) throws IgniteCheckedException {
+         A.notNull(addr, "addr");
+         A.notNull(lsnr, "lsnr");
+         A.notNull(log, "log");
+         A.notNull(order, "order");
+ 
+         A.ensure(port == -1 || (port > 0 && port < 0xffff), "port");
+         A.ensure(selectorCnt > 0, "selectorCnt");
+         A.ensure(sockRcvBuf >= 0, "sockRcvBuf");
+         A.ensure(sockSndBuf >= 0, "sockSndBuf");
+         A.ensure(sndQueueLimit >= 0, "sndQueueLimit");
+ 
+         this.log = log;
+         this.directBuf = directBuf;
+         this.order = order;
+         this.tcpNoDelay = tcpNoDelay;
+         this.sockRcvBuf = sockRcvBuf;
+         this.sockSndBuf = sockSndBuf;
+         this.sndQueueLimit = sndQueueLimit;
+ 
+         filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
+ 
+         if (directMode) {
+             for (GridNioFilter filter : filters) {
+                 if (filter instanceof GridNioSslFilter) {
+                     sslFilter = (GridNioSslFilter)filter;
+ 
+                     assert sslFilter.directMode();
+                 }
+             }
+         }
+ 
+         if (port != -1) {
+             // Once bind, we will not change the port in future.
+             locAddr = new InetSocketAddress(addr, port);
+ 
+             // This method will throw exception if address already in use.
+             Selector acceptSelector = createSelector(locAddr);
+ 
+             acceptThread = new IgniteThread(new GridNioAcceptWorker(gridName, "nio-acceptor", log, acceptSelector));
+         }
+         else {
+             locAddr = null;
+             acceptThread = null;
+         }
+ 
+         clientWorkers = new ArrayList<>(selectorCnt);
+         clientThreads = new IgniteThread[selectorCnt];
+ 
+         for (int i = 0; i < selectorCnt; i++) {
+             AbstractNioClientWorker worker = directMode ?
+                 new DirectNioClientWorker(i, gridName, "grid-nio-worker-" + i, log) :
+                 new ByteBufferNioClientWorker(i, gridName, "grid-nio-worker-" + i, log);
+ 
+             clientWorkers.add(worker);
+ 
+             clientThreads[i] = new IgniteThread(worker);
+ 
+             clientThreads[i].setDaemon(daemon);
+         }
+ 
+         this.directMode = directMode;
+         this.metricsLsnr = metricsLsnr;
 -        this.msgWriter = msgWriter;
+     }
+ 
+     /**
+      * Creates and returns a builder for a new instance of this class.
+      *
+      * @return Builder for new instance.
+      */
+     public static <T> Builder<T> builder() {
+         return new Builder<>();
+     }
+ 
+     /**
+      * Starts all associated threads to perform accept and read activities.
+      */
+     public void start() {
+         filterChain.start();
+ 
+         if (acceptThread != null)
+             acceptThread.start();
+ 
+         for (IgniteThread thread : clientThreads)
+             thread.start();
+     }
+ 
+     /**
+      * Stops all threads and releases all resources.
+      */
+     public void stop() {
+         if (!closed) {
+             closed = true;
+ 
+             // Make sure to entirely stop acceptor if any.
+             U.interrupt(acceptThread);
+             U.join(acceptThread, log);
+ 
+             U.cancel(clientWorkers);
+             U.join(clientWorkers, log);
+ 
+             filterChain.stop();
+         }
+     }
+ 
+     /**
+      * Gets the address server is bound to.
+      *
+      * @return Address server is bound to.
+      */
+     public InetSocketAddress localAddress() {
+         return locAddr;
+     }
+ 
+     /**
+      * @param ses Session to close.
+      * @return Future for operation.
+      */
+     public GridNioFuture<Boolean> close(GridNioSession ses) {
+         assert ses instanceof GridSelectorNioSessionImpl;
+ 
+         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
+ 
+         if (impl.closed())
+             return new GridNioFinishedFuture<>(false);
+ 
+         NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE);
+ 
+         clientWorkers.get(impl.selectorIndex()).offer(fut);
+ 
+         return fut;
+     }
+ 
+     /**
+      * @param ses Session.
+      * @param msg Message.
+      * @return Future for operation.
+      */
+     GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg) {
+         assert ses instanceof GridSelectorNioSessionImpl;
+ 
+         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
+ 
+         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+ 
+         send0(impl, fut, false);
+ 
+         return fut;
+     }
+ 
+     /**
+      * @param ses Session.
+      * @param msg Message.
+      * @return Future for operation.
+      */
+     GridNioFuture<?> send(GridNioSession ses, GridTcpCommunicationMessageAdapter msg) {
+         assert ses instanceof GridSelectorNioSessionImpl;
+ 
+         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
+ 
+         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+ 
+         send0(impl, fut, false);
+ 
+         return fut;
+     }
+ 
+     /**
+      * @param ses Session.
+      * @param fut Future.
+      * @param sys System message flag.
+      */
+     private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut, boolean sys) {
+         assert ses != null;
+         assert fut != null;
+ 
+         int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
+ 
+         if (ses.closed()) {
+             if (ses.removeFuture(fut))
+                 fut.connectionClosed();
+         }
+         else if (msgCnt == 1)
+             // Change from 0 to 1 means that worker thread should be waken up.
+             clientWorkers.get(ses.selectorIndex()).offer(fut);
+     }
+ 
+     /**
+      * Adds message at the front of the queue without acquiring back pressure semaphore.
+      *
+      * @param ses Session.
+      * @param msg Message.
+      * @return Future.
+      */
+     public GridNioFuture<?> sendSystem(GridNioSession ses, GridTcpCommunicationMessageAdapter msg) {
+         return sendSystem(ses, msg, null);
+     }
+ 
+     /**
+      * Adds message at the front of the queue without acquiring back pressure semaphore.
+      *
+      * @param ses Session.
+      * @param msg Message.
+      * @param lsnr Future listener notified from the session thread.
+      * @return Future.
+      */
+     public GridNioFuture<?> sendSystem(GridNioSession ses,
+         GridTcpCommunicationMessageAdapter msg,
+         @Nullable IgniteInClosure<? super GridNioFuture<?>> lsnr) {
+         assert ses instanceof GridSelectorNioSessionImpl;
+ 
+         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
+ 
+         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+ 
+         if (lsnr != null) {
+             fut.listenAsync(lsnr);
+ 
+             assert !fut.isDone();
+         }
+ 
+         send0(impl, fut, true);
+ 
+         return fut;
+     }
+ 
+     /**
+      * @param ses Session.
+      */
+     public void resend(GridNioSession ses) {
+         assert ses instanceof GridSelectorNioSessionImpl;
+ 
+         GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+ 
+         if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) {
+             Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures();
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']');
+ 
+             GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
+ 
+             GridNioFuture<?> fut0 = futs.iterator().next();
+ 
+             for (GridNioFuture<?> fut : futs) {
+                 fut.messageThread(true);
+ 
+                 ((NioOperationFuture)fut).resetMessage(ses0);
+             }
+ 
+             ses0.resend(futs);
+ 
+             // Wake up worker.
+             clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
+         }
+     }
+ 
+     /**
+      * @param ses Session.
+      * @param op Operation.
+      * @return Future for operation.
+      */
+     GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) {
+         assert ses instanceof GridSelectorNioSessionImpl;
+         assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ;
+ 
+         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
+ 
+         if (impl.closed())
+             return new GridNioFinishedFuture(new IOException("Failed to pause/resume reads " +
+                 "(connection was closed): " + ses));
+ 
+         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op);
+ 
+         clientWorkers.get(impl.selectorIndex()).offer(fut);
+ 
+         return fut;
+     }
+ 
+     /**
+      * Establishes a session.
+      *
+      * @param ch Channel to register within the server and create session for.
+      * @param meta Optional meta for new session.
+      * @return Future to get session.
+      */
+     public GridNioFuture<GridNioSession> createSession(final SocketChannel ch,
+         @Nullable Map<Integer, ?> meta) {
+         try {
+             ch.configureBlocking(false);
+ 
+             NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
+ 
+             offerBalanced(req);
+ 
+             return req;
+         }
+         catch (IOException e) {
+             return new GridNioFinishedFuture<>(e);
+         }
+     }
+ 
+     /**
+      * Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}.
+      *
+      * @return Write timeout in milliseconds.
+      */
+     public long writeTimeout() {
+         return writeTimeout;
+     }
+ 
+     /**
+      * Sets configurable write timeout for session.
+      *
+      * @param writeTimeout Write timeout in milliseconds.
+      */
+     public void writeTimeout(long writeTimeout) {
+         this.writeTimeout = writeTimeout;
+     }
+ 
+     /**
+      * Gets configurable idle timeout for this session. If not set, default value is
+      * {@link IgniteConfiguration#DFLT_REST_IDLE_TIMEOUT}.
+      *
+      * @return Idle timeout in milliseconds.
+      */
+     public long idleTimeout() {
+         return idleTimeout;
+     }
+ 
+     /**
+      * Sets configurable idle timeout for session.
+      *
+      * @param idleTimeout Idle timeout in milliseconds.
+      */
+     public void idleTimeout(long idleTimeout) {
+         this.idleTimeout = idleTimeout;
+     }
+ 
+     /**
+      * Creates selector and binds server socket to a given address and port. If address is null
+      * then will not bind any address and just creates a selector.
+      *
+      * @param addr Local address to listen on.
+      * @return Created selector.
+      * @throws IgniteCheckedException If selector could not be created or port is already in use.
+      */
+     private Selector createSelector(@Nullable SocketAddress addr) throws IgniteCheckedException {
+         Selector selector = null;
+ 
+         ServerSocketChannel srvrCh = null;
+ 
+         try {
+             // Create a new selector
+             selector = SelectorProvider.provider().openSelector();
+ 
+             if (addr != null) {
+                 // Create a new non-blocking server socket channel
+                 srvrCh = ServerSocketChannel.open();
+ 
+                 srvrCh.configureBlocking(false);
+ 
+                 if (sockRcvBuf > 0)
+                     srvrCh.socket().setReceiveBufferSize(sockRcvBuf);
+ 
+                 // Bind the server socket to the specified address and port
+                 srvrCh.socket().bind(addr);
+ 
+                 // Register the server socket channel, indicating an interest in
+                 // accepting new connections
+                 srvrCh.register(selector, SelectionKey.OP_ACCEPT);
+             }
+ 
+             return selector;
+         }
+         catch (Throwable e) {
+             U.close(srvrCh, log);
+             U.close(selector, log);
+ 
+             throw new IgniteCheckedException("Failed to initialize NIO selector.", e);
+         }
+     }
+ 
+     /**
+      * @param req Request to balance.
+      */
+     private synchronized void offerBalanced(NioOperationFuture req) {
+         clientWorkers.get(balanceIdx).offer(req);
+ 
+         balanceIdx++;
+ 
+         if (balanceIdx == clientWorkers.size())
+             balanceIdx = 0;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridNioServer.class, this);
+     }
+ 
+     /**
+      * Client worker for byte buffer mode.
+      */
+     private class ByteBufferNioClientWorker extends AbstractNioClientWorker {
+         /** Read buffer. */
+         private final ByteBuffer readBuf;
+ 
+         /**
+          * @param idx Index of this worker in server's array.
+          * @param gridName Grid name.
+          * @param name Worker name.
+          * @param log Logger.
+          * @throws IgniteCheckedException If selector could not be created.
+          */
+         protected ByteBufferNioClientWorker(int idx, @Nullable String gridName, String name, IgniteLogger log)
+             throws IgniteCheckedException {
+             super(idx, gridName, name, log);
+ 
+             readBuf = directBuf ? ByteBuffer.allocateDirect(8 << 10) : ByteBuffer.allocate(8 << 10);
+ 
+             readBuf.order(order);
+         }
+ 
+         /**
+         * Processes read-available event on the key.
+         *
+         * @param key Key that is ready to be read.
+         * @throws IOException If key read failed.
+         */
+         @Override protected void processRead(SelectionKey key) throws IOException {
+             ReadableByteChannel sockCh = (ReadableByteChannel)key.channel();
+ 
+             final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ 
+             // Reset buffer to read bytes up to its capacity.
+             readBuf.clear();
+ 
+             // Attempt to read off the channel
+             int cnt = sockCh.read(readBuf);
+ 
+             if (cnt == -1) {
+                 if (log.isDebugEnabled())
+                     log.debug("Remote client closed connection: " + ses);
+ 
+                 close(ses, null);
+ 
+                 return;
+             }
+             else if (cnt == 0)
+                 return;
+ 
+             if (log.isTraceEnabled())
+                 log.trace("Bytes received [sockCh=" + sockCh + ", cnt=" + cnt + ']');
+ 
+             if (metricsLsnr != null)
+                 metricsLsnr.onBytesReceived(cnt);
+ 
+             ses.bytesReceived(cnt);
+ 
+             // Sets limit to current position and
+             // resets position to 0.
+             readBuf.flip();
+ 
+             try {
+                 assert readBuf.hasRemaining();
+ 
+                 filterChain.onMessageReceived(ses, readBuf);
+ 
+                 if (readBuf.remaining() > 0) {
+                     LT.warn(log, null, "Read buffer contains data after filter chain processing (will discard " +
+                         "remaining bytes) [ses=" + ses + ", remainingCnt=" + readBuf.remaining() + ']');
+ 
+                     readBuf.clear();
+                 }
+             }
+             catch (IgniteCheckedException e) {
+                 close(ses, e);
+             }
+         }
+ 
+         /**
+         * Processes write-ready event on the key.
+         *
+         * @param key Key that is ready to be written.
+         * @throws IOException If write failed.
+         */
+         @Override protected void processWrite(SelectionKey key) throws IOException {
+             WritableByteChannel sockCh = (WritableByteChannel)key.channel();
+ 
+             final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ 
+             while (true) {
+                 ByteBuffer buf = ses.removeMeta(BUF_META_KEY);
+                 NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+ 
+                 // Check if there were any pending data from previous writes.
+                 if (buf == null) {
+                     assert req == null;
+ 
+                     req = (NioOperationFuture<?>)ses.pollFuture();
+ 
+                     if (req == null) {
+                         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ 
+                         break;
+                     }
+ 
+                     buf = req.message();
+                 }
+ 
+                 if (!skipWrite) {
+                     int cnt = sockCh.write(buf);
+ 
+                     if (log.isTraceEnabled())
+                         log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']');
+ 
+                     if (metricsLsnr != null)
+                         metricsLsnr.onBytesSent(cnt);
+ 
+                     ses.bytesSent(cnt);
+                 }
+                 else {
+                     // For test purposes only (skipWrite is set to true in tests only).
+                     try {
+                         U.sleep(50);
+                     }
+                     catch (IgniteInterruptedException e) {
+                         throw new IOException("Thread has been interrupted.", e);
+                     }
+                 }
+ 
+                 if (buf.remaining() > 0) {
+                     // Not all data was written.
+                     ses.addMeta(BUF_META_KEY, buf);
+                     ses.addMeta(NIO_OPERATION.ordinal(), req);
+ 
+                     break;
+                 }
+                 else {
+                     // Message was successfully written.
+                     assert req != null;
+ 
+                     req.onDone();
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * Client worker for direct mode.
+      */
+     private class DirectNioClientWorker extends AbstractNioClientWorker {
+         /**
+          * @param idx Index of this worker in server's array.
+          * @param gridName Grid name.
+          * @param name Worker name.
+          * @param log Logger.
+          * @throws IgniteCheckedException If selector could not be created.
+          */
+         protected DirectNioClientWorker(int idx, @Nullable String gridName, String name, IgniteLogger log)
+             throws IgniteCheckedException {
+             super(idx, gridName, name, log);
+         }
+ 
+         /**
+          * Processes read-available event on the key.
+          *
+          * @param key Key that is ready to be read.
+          * @throws IOException If key read failed.
+          */
+         @Override protected void processRead(SelectionKey key) throws IOException {
+             ReadableByteChannel sockCh = (ReadableByteChannel)key.channel();
+ 
+             final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ 
+             ByteBuffer readBuf = ses.readBuffer();
+ 
+             // Attempt to read off the channel.
+             int cnt = sockCh.read(readBuf);
+ 
+             if (cnt == -1) {
+                 if (log.isDebugEnabled())
+                     log.debug("Remote client closed connection: " + ses);
+ 
+                 close(ses, null);
+ 
+                 return;
+             }
+             else if (cnt == 0 && !readBuf.hasRemaining())
+                 return;
+ 
+             if (log.isTraceEnabled())
+                 log.trace("Bytes received [sockCh=" + sockCh + ", cnt=" + cnt + ']');
+ 
+             if (metricsLsnr != null)
+                 metricsLsnr.onBytesReceived(cnt);
+ 
+             ses.bytesReceived(cnt);
+ 
+             // Sets limit to current position and
+             // resets position to 0.
+             readBuf.flip();
+ 
+             try {
+                 assert readBuf.hasRemaining();
+ 
+                 filterChain.onMessageReceived(ses, readBuf);
+ 
+                 if (readBuf.hasRemaining())
+                     readBuf.compact();
+                 else
+                     readBuf.clear();
+             }
+             catch (IgniteCheckedException e) {
+                 close(ses, e);
+             }
+         }
+ 
+         /**
+          * Processes write-ready event on the key.
+          *
+          * @param key Key that is ready to be written.
+          * @throws IOException If write failed.
+          */
+         @Override protected void processWrite(SelectionKey key) throws IOException {
+             if (sslFilter != null)
+                 processWriteSsl(key);
+             else
+                 processWrite0(key);
+         }
+ 
+         /**
+          * Processes write-ready event on the key.
+          *
+          * @param key Key that is ready to be written.
+          * @throws IOException If write failed.
+          */
+         @SuppressWarnings("ForLoopReplaceableByForEach")
+         private void processWriteSsl(SelectionKey key) throws IOException {
+             WritableByteChannel sockCh = (WritableByteChannel)key.channel();
+ 
+             GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ 
+             boolean handshakeFinished = sslFilter.lock(ses);
+ 
+             try {
+                 writeSslSystem(ses, sockCh);
+ 
+                 if (!handshakeFinished)
+                     return;
+ 
+                 ByteBuffer sslNetBuf = ses.removeMeta(BUF_META_KEY);
+ 
+                 if (sslNetBuf != null) {
+                     int cnt = sockCh.write(sslNetBuf);
+ 
+                     if (metricsLsnr != null)
+                         metricsLsnr.onBytesSent(cnt);
+ 
+                     ses.bytesSent(cnt);
+ 
+                     if (sslNetBuf.hasRemaining()) {
+                         ses.addMeta(BUF_META_KEY, sslNetBuf);
+ 
+                         return;
+                     }
+                 }
+ 
+                 ByteBuffer buf = ses.writeBuffer();
+                 NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
 -                UUID nodeId = ses.meta(DIFF_VER_NODE_ID_META_KEY);
+ 
+                 List<NioOperationFuture<?>> doneFuts = null;
+ 
+                 while (true) {
+                     if (req == null) {
+                         req = (NioOperationFuture<?>)ses.pollFuture();
+ 
+                         if (req == null && buf.position() == 0) {
+                             key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ 
+                             break;
+                         }
+                     }
+ 
+                     GridTcpCommunicationMessageAdapter msg;
+                     boolean finished = false;
+ 
+                     if (req != null) {
+                         msg = req.directMessage();
+ 
+                         assert msg != null;
 -                        assert msgWriter != null;
+ 
 -                        finished = msgWriter.write(nodeId, msg, buf);
++                        finished = msg.writeTo(buf);
+                     }
+ 
+                     // Fill up as many messages as possible to write buffer.
+                     while (finished) {
+                         if (doneFuts == null)
+                             doneFuts = new ArrayList<>();
+ 
+                         doneFuts.add(req);
+ 
+                         req = (NioOperationFuture<?>)ses.pollFuture();
+ 
+                         if (req == null)
+                             break;
+ 
+                         msg = req.directMessage();
+ 
+                         assert msg != null;
 -                        assert msgWriter != null;
+ 
 -                        finished = msgWriter.write(nodeId, msg, buf);
++                        finished = msg.writeTo(buf);
+                     }
+ 
+                     buf.flip();
+ 
+                     ByteBuffer sesBuf = ses.writeBuffer();
+ 
+                     buf = sslFilter.encrypt(ses, sesBuf);
+ 
+                     sesBuf.clear();
+ 
+                     assert buf.hasRemaining();
+ 
+                     if (!skipWrite) {
+                         int cnt = sockCh.write(buf);
+ 
+                         if (!F.isEmpty(doneFuts)) {
+                             for (int i = 0; i < doneFuts.size(); i++)
+                                 doneFuts.get(i).onDone();
+ 
+                             doneFuts.clear();
+                         }
+ 
+                         if (log.isTraceEnabled())
+                             log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']');
+ 
+                         if (metricsLsnr != null)
+                             metricsLsnr.onBytesSent(cnt);
+ 
+                         ses.bytesSent(cnt);
+                     }
+                     else {
+                         // For test purposes only (skipWrite is set to true in tests only).
+                         try {
+                             U.sleep(50);
+                         }
+                         catch (IgniteInterruptedException e) {
+                             throw new IOException("Thread has been interrupted.", e);
+                         }
+                     }
+ 
+                     ses.addMeta(NIO_OPERATION.ordinal(), req);
+ 
+                     if (buf.hasRemaining()) {
+                         ses.addMeta(BUF_META_KEY, buf);
+ 
+                         break;
+                     }
+                     else
+                         buf = ses.writeBuffer();
+                 }
+             }
+             finally {
+                 sslFilter.unlock(ses);
+             }
+         }
+ 
+         /**
+          * @param ses NIO session.
+          * @param sockCh Socket channel.
+          * @throws IOException If failed.
+          */
+         private void writeSslSystem(GridSelectorNioSessionImpl ses, WritableByteChannel sockCh)
+             throws IOException {
+             ConcurrentLinkedDeque8<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
+ 
+             ByteBuffer buf;
+ 
+             while ((buf = queue.peek()) != null) {
+                 int cnt = sockCh.write(buf);
+ 
+                 if (metricsLsnr != null)
+                     metricsLsnr.onBytesSent(cnt);
+ 
+                 ses.bytesSent(cnt);
+ 
+                 if (!buf.hasRemaining())
+                     queue.remove(buf);
+                 else
+                     break;
+             }
+         }
+ 
+         /**
+          * Processes write-ready event on the key.
+          *
+          * @param key Key that is ready to be written.
+          * @throws IOException If write failed.
+          */
+         @SuppressWarnings("ForLoopReplaceableByForEach")
+         private void processWrite0(SelectionKey key) throws IOException {
+             WritableByteChannel sockCh = (WritableByteChannel)key.channel();
+ 
+             GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+             ByteBuffer buf = ses.writeBuffer();
+             NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
 -            UUID nodeId = ses.meta(DIFF_VER_NODE_ID_META_KEY);
+ 
+             List<NioOperationFuture<?>> doneFuts = null;
+ 
+             while (true) {
+                 if (req == null) {
+                     req = (NioOperationFuture<?>)ses.pollFuture();
+ 
+                     if (req == null && buf.position() == 0) {
+                         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ 
+                         break;
+                     }
+                 }
+ 
+                 GridTcpCommunicationMessageAdapter msg;
+                 boolean finished = false;
+ 
+                 if (req != null) {
+                     msg = req.directMessage();
+ 
+                     assert msg != null;
 -                    assert msgWriter != null;
+ 
 -                    finished = msgWriter.write(nodeId, msg, buf);
++                    finished = msg.writeTo(buf);
+                 }
+ 
+                 // Fill up as many messages as possible to write buffer.
+                 while (finished) {
+                     if (doneFuts == null)
+                         doneFuts = new ArrayList<>();
+ 
+                     doneFuts.add(req);
+ 
+                     req = (NioOperationFuture<?>)ses.pollFuture();
+ 
+                     if (req == null)
+                         break;
+ 
+                     msg = req.directMessage();
+ 
+                     assert msg != null;
 -                    assert msgWriter != null;
+ 
 -                    finished = msgWriter.write(nodeId, msg, buf);
++                    finished = msg.writeTo(buf);
+                 }
+ 
+                 buf.flip();
+ 
+                 assert buf.hasRemaining();
+ 
+                 if (!skipWrite) {
+                     int cnt = sockCh.write(buf);
+ 
+                     if (!F.isEmpty(doneFuts)) {
+                         for (int i = 0; i < doneFuts.size(); i++)
+                             doneFuts.get(i).onDone();
+ 
+                         doneFuts.clear();
+                     }
+ 
+                     if (log.isTraceEnabled())
+                         log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']');
+ 
+                     if (metricsLsnr != null)
+                         metricsLsnr.onBytesSent(cnt);
+ 
+                     ses.bytesSent(cnt);
+                 }
+                 else {
+                     // For test purposes only (skipWrite is set to true in tests only).
+                     try {
+                         U.sleep(50);
+                     }
+                     catch (IgniteInterruptedException e) {
+                         throw new IOException("Thread has been interrupted.", e);
+                     }
+                 }
+ 
+                 if (buf.hasRemaining()) {
+                     buf.compact();
+ 
+                     ses.addMeta(NIO_OPERATION.ordinal(), req);
+ 
+                     break;
+                 }
+                 else
+                     buf.clear();
+             }
+         }
+     }
+ 
+     /**
+      * Thread performing only read operations from the channel.
+      */
+     private abstract class AbstractNioClientWorker extends GridWorker {
+         /** Queue of change requests on this selector. */
+         private final Queue<NioOperationFuture> changeReqs = new ConcurrentLinkedDeque8<>();
+ 
+         /** Selector to select read events. */
+         private Selector selector;
+ 
+         /** Worker index. */
+         private final int idx;
+ 
+         /**
+          * @param idx Index of this worker in server's array.
+          * @param gridName Grid name.
+          * @param name Worker name.
+          * @param log Logger.
+          * @throws IgniteCheckedException If selector could not be created.
+          */
+         protected AbstractNioClientWorker(int idx, @Nullable String gridName, String name, IgniteLogger log)
+             throws IgniteCheckedException {
+             super(gridName, name, log);
+ 
+             selector = createSelector(null);
+ 
+             this.idx = idx;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void body() throws InterruptedException, IgniteInterruptedException {
+             try {
+                 boolean reset = false;
+                 while (!closed) {
+                     try {
+                         if (reset)
+                             selector = createSelector(null);
+ 
+                         bodyInternal();
+                     }
+                     catch (IgniteCheckedException e) {
+                         if (!Thread.currentThread().isInterrupted()) {
+                             U.error(log, "Failed to read data from remote connection (will wait for " +
+                                 ERR_WAIT_TIME + "ms).", e);
+ 
+                             U.sleep(ERR_WAIT_TIME);
+ 
+                             reset = true;
+                         }
+                     }
+                 }
+             }
+             catch (Throwable e) {
+                 U.error(log, "Caught unhandled exception in NIO worker thread (restart the node).", e);
+             }
+         }
+ 
+         /**
+          * Adds socket channel to the registration queue and wakes up reading thread.
+          *
+          * @param req Change request.
+          */
+         private void offer(NioOperationFuture req) {
+             changeReqs.offer(req);
+ 
+             selector.wakeup();
+         }
+ 
+         /**
+          * Processes read and write events and registration requests.
+          *
+          * @throws IgniteCheckedException If IOException occurred or thread was unable to add worker to workers pool.
+          */
+         @SuppressWarnings("unchecked")
+         private void bodyInternal() throws IgniteCheckedException {
+             try {
+                 while (!closed && selector.isOpen()) {
+                     NioOperationFuture req;
+ 
+                     while ((req = changeReqs.poll()) != null) {
+                         switch (req.operation()) {
+                             case REGISTER: {
+                                 register(req);
+ 
+                                 break;
+                             }
+ 
+                             case REQUIRE_WRITE: {
+                                 //Just register write key.
+                                 SelectionKey key = req.session().key();
+ 
+                                 if (key.isValid()) {
+                                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ 
+                                     // Update timestamp to protected against false write timeout.
+                                     ((GridNioSessionImpl)key.attachment()).bytesSent(0);
+                                 }
+ 
+                                 break;
+                             }
+ 
+                             case CLOSE: {
+                                 if (close(req.session(), null))
+                                     req.onDone(true);
+                                 else
+                                     req.onDone(false);
+ 
+                                 break;
+                             }
+ 
+                             case PAUSE_READ: {
+                                 SelectionKey key = req.session().key();
+ 
+                                 if (key.isValid()) {
+                                     key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
+ 
+                                     GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ 
+                                     ses.readsPaused(true);
+ 
+                                     req.onDone(true);
+                                 }
+                                 else
+                                     req.onDone(false);
+ 
+                                 break;
+                             }
+ 
+                             case RESUME_READ: {
+                                 SelectionKey key = req.session().key();
+ 
+                                 if (key.isValid()) {
+                                     key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+ 
+                                     GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ 
+                                     ses.readsPaused(false);
+ 
+                                     req.onDone(true);
+                                 }
+                                 else
+                                     req.onDone(false);
+ 
+                                 break;
+                             }
+                         }
+                     }
+ 
+                     // Wake up every 2 seconds to check if closed.
+                     if (selector.select(2000) > 0)
+                         // Walk through the ready keys collection and process network events.
+                         processSelectedKeys(selector.selectedKeys());
+ 
+                     checkIdle(selector.keys());
+                 }
+             }
+             // Ignore this exception as thread interruption is equal to 'close' call.
+             catch (ClosedByInterruptException e) {
+                 if (log.isDebugEnabled())
+                     log.debug("Closing selector due to thread interruption: " + e.getMessage());
+             }
+             catch (ClosedSelectorException e) {
+                 throw new IgniteCheckedException("Selector got closed while active.", e);
+             }
+             catch (IOException e) {
+                 throw new IgniteCheckedException("Failed to select events on selector.", e);
+             }
+             finally {
+                 if (selector.isOpen()) {
+                     if (log.isDebugEnabled())
+                         log.debug("Closing all connected client sockets.");
+ 
+                     // Close all channels registered with selector.
+                     for (SelectionKey key : selector.keys())
+                         close((GridSelectorNioSessionImpl)key.attachment(), null);
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Closing NIO selector.");
+ 
+                     U.close(selector, log);
+                 }
+             }
+         }
+ 
+         /**
+          * Processes keys selected by a selector.
+          *
+          * @param keys Selected keys.
+          * @throws ClosedByInterruptException If this thread was interrupted while reading data.
+          */
+         private void processSelectedKeys(Set<SelectionKey> keys) throws ClosedByInterruptException {
+             if (log.isTraceEnabled())
+                 log.trace("Processing keys in client worker: " + keys.size());
+ 
+             for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) {
+                 SelectionKey key = iter.next();
+ 
+                 iter.remove();
+ 
+                 // Was key closed?
+                 if (!key.isValid())
+                     continue;
+ 
+                 GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ 
+                 assert ses != null;
+ 
+                 try {
+                     if (key.isReadable())
+                         processRead(key);
+ 
+                     if (key.isValid() && key.isWritable())
+                         processWrite(key);
+                 }
+                 catch (ClosedByInterruptException e) {
+                     // This exception will be handled in bodyInternal() method.
+                     throw e;
+                 }
+                 catch (IOException e) {
+                     if (!closed)
+                         U.warn(log, "Failed to process selector key (will close): " + ses, e);
+ 
+                     close(ses, new GridNioException(e));
+                 }
+             }
+         }
+ 
+         /**
+          * Checks sessions assigned to a selector for timeouts.
+          *
+          * @param keys Keys registered to selector.
+          */
+         private void checkIdle(Iterable<SelectionKey> keys) {
+             long now = U.currentTimeMillis();
+ 
+             for (SelectionKey key : keys) {
+                 GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ 
+                 try {
+                     long writeTimeout0 = writeTimeout;
+ 
+                     // If we are writing and timeout passed.
+                     if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0 &&
+                         now - ses.lastSendTime() > writeTimeout0) {
+                         filterChain.onSessionWriteTimeout(ses);
+ 
+                         // Update timestamp to avoid multiple notifications within one timeout interval.
+                         ses.bytesSent(0);
+ 
+                         continue;
+                     }
+ 
+                     long idleTimeout0 = idleTimeout;
+ 
+                     if (now - ses.lastReceiveTime() > idleTimeout0 && now - ses.lastSendScheduleTime() > idleTimeout0) {
+                         filterChain.onSessionIdleTimeout(ses);
+ 
+                         // Update timestamp to avoid multiple notifications within one timeout interval.
+                         ses.resetSendScheduleTime();
+                         ses.bytesReceived(0);
+                     }
+                 }
+                 catch (IgniteCheckedException e) {
+                     close(ses,  e);
+                 }
+             }
+         }
+ 
+         /**
+          * Registers given socket channel to the selector, creates a session and notifies the listener.
+          *
+          * @param req Registration request.
+          */
+         private void register(NioOperationFuture<GridNioSession> req) {
+             assert req != null;
+ 
+             SocketChannel sockCh = req.socketChannel();
+ 
+             assert sockCh != null;
+ 
+             Socket sock = sockCh.socket();
+ 
+             try {
+                 ByteBuffer writeBuf = null;
+                 ByteBuffer readBuf = null;
+ 
+                 if (directMode) {
+                     writeBuf = directBuf ? ByteBuffer.allocateDirect(sock.getSendBufferSize()) :
+                         ByteBuffer.allocate(sock.getSendBufferSize());
+                     readBuf = directBuf ? ByteBuffer.allocateDirect(sock.getReceiveBufferSize()) :
+                         ByteBuffer.allocate(sock.getReceiveBufferSize());
+ 
+                     writeBuf.order(order);
+                     readBuf.order(order);
+                 }
+ 
+                 final GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl(
+                     log,
+                     idx,
+                     filterChain,
+                     (InetSocketAddress)sockCh.getLocalAddress(),
+                     (InetSocketAddress)sockCh.getRemoteAddress(),
+                     req.accepted(),
+                     sndQueueLimit,
+                     writeBuf,
+                     readBuf);
+ 
+                 Map<Integer, ?> meta = req.meta();
+ 
+                 if (meta != null) {
+                     for (Entry<Integer, ?> e : meta.entrySet())
+                         ses.addMeta(e.getKey(), e.getValue());
+                 }
+ 
+                 SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+ 
+                 ses.key(key);
+ 
+                 if (!ses.accepted())
+                     resend(ses);
+ 
+                 sessions.add(ses);
+ 
+                 try {
+                     filterChain.onSessionOpened(ses);
+ 
+                     req.onDone(ses);
+                 }
+                 catch (IgniteCheckedException e) {
+                     close(ses, e);
+ 
+                     req.onDone(e);
+                 }
+             }
+             catch (ClosedChannelException e) {
+                 U.warn(log, "Failed to register accepted socket channel to selector (channel was closed): "
+                     + sock.getRemoteSocketAddress(), e);
+             }
+             catch (IOException e) {
+                 U.error(log, "Failed to get socket addresses.", e);
+             }
+         }
+ 
+         /**
+          * Closes the ses and all associated resources, then notifies the listener.
+          *
+          * @param ses Session to be closed.
+          * @param e Exception to be passed to the listener, if any.
+          * @return {@code True} if this call closed the ses.
+          */
+         protected boolean close(final GridSelectorNioSessionImpl ses, @Nullable final IgniteCheckedException e) {
+             if (e != null) {
+                 // Print stack trace only if has runtime exception in it's cause.
+                 if (e.hasCause(IOException.class))
+                     U.warn(log, "Closing NIO session because of unhandled exception [cls=" + e.getClass() +
+                         ", msg=" + e.getMessage() + ']');
+                 else
+                     U.error(log, "Closing NIO session because of unhandled exception.", e);
+             }
+ 
+             sessions.remove(ses);
+ 
+             SelectionKey key = ses.key();
+ 
+             // Shutdown input and output so that remote client will see correct socket close.
+             Socket sock = ((SocketChannel)key.channel()).socket();
+ 
+             if (ses.setClosed()) {
+                 if (directBuf) {
+                     if (ses.writeBuffer() != null)
+                         ((DirectBuffer)ses.writeBuffer()).cleaner().clean();
+ 
+                     if (ses.readBuffer() != null)
+                         ((DirectBuffer)ses.readBuffer()).cleaner().clean();
+                 }
+ 
+                 try {
+                     try {
+                         sock.shutdownInput();
+                     }
+                     catch (IOException ignored) {
+                         // No-op.
+                     }
+ 
+                     try {
+                         sock.shutdownOutput();
+                     }
+                     catch (IOException ignored) {
+                         // No-op.
+                     }
+                 }
+                 finally {
+                     U.close(key, log);
+                     U.close(sock, log);
+                 }
+ 
+                 if (e != null)
+                     filterChain.onExceptionCaught(ses, e);
+ 
+                 try {
+                     filterChain.onSessionClosed(ses);
+                 }
+                 catch (IgniteCheckedException e1) {
+                     filterChain.onExceptionCaught(ses, e1);
+                 }
+ 
+                 ses.removeMeta(BUF_META_KEY);
+ 
+                 // Since ses is in closed state, no write requests will be added.
+                 NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal());
+ 
+                 GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+ 
+                 if (recovery != null) {
+                     try {
+                         // Poll will update recovery data.
+                         while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) {
+                             if (fut.skipRecovery())
+                                 fut.connectionClosed();
+                         }
+                     }
+                     finally {
+                         recovery.release();
+                     }
+                 }
+                 else {
+                     if (fut != null)
+                         fut.connectionClosed();
+ 
+                     while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null)
+                         fut.connectionClosed();
+                 }
+ 
+                 return true;
+             }
+ 
+             return false;
+         }
+ 
+         /**
+          * Processes read-available event on the key.
+          *
+          * @param key Key that is ready to be read.
+          * @throws IOException If key read failed.
+          */
+         protected abstract void processRead(SelectionKey key) throws IOException;
+ 
+         /**
+          * Processes write-ready event on the key.
+          *
+          * @param key Key that is ready to be written.
+          * @throws IOException If write failed.
+          */
+         protected abstract void processWrite(SelectionKey key) throws IOException;
+     }
+ 
+     /**
+      * Gets outbound messages queue size.
+      *
+      * @return Write queue size.
+      */
+     public int outboundMessagesQueueSize() {
+         int res = 0;
+ 
+         for (GridSelectorNioSessionImpl ses : sessions)
+             res += ses.writeQueueSize();
+ 
+         return res;
+     }
+ 
+     /**
+      * A separate thread that will accept incoming connections and schedule read to some worker.
+      */
+     private class GridNioAcceptWorker extends GridWorker {
+         /** Selector for this thread. */
+         private Selector selector;
+ 
+         /**
+          * @param gridName Grid name.
+          * @param name Thread name.
+          * @param log Log.
+          * @param selector Which will accept incoming connections.
+          */
+         protected GridNioAcceptWorker(@Nullable String gridName, String name, IgniteLogger log, Selector selector) {
+             super(gridName, name, log);
+ 
+             this.selector = selector;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void body() throws InterruptedException, IgniteInterruptedException {
+             try {
+                 boolean reset = false;
+ 
+                 while (!closed && !Thread.currentThread().isInterrupted()) {
+                     try {
+                         if (reset)
+                             selector = createSelector(locAddr);
+ 
+                         accept();
+                     }
+                     catch (IgniteCheckedException e) {
+                         if (!Thread.currentThread().isInterrupted()) {
+                             U.error(log, "Failed to accept remote connection (will wait for " + ERR_WAIT_TIME + "ms).",
+                                 e);
+ 
+                             U.sleep(ERR_WAIT_TIME);
+ 
+                             reset = true;
+                         }
+                     }
+                 }
+             }
+             finally {
+                 closeSelector(); // Safety.
+             }
+         }
+ 
+         /**
+          * Accepts connections and schedules them for processing by one of read workers.
+          *
+          * @throws IgniteCheckedException If failed.
+          */
+         private void accept() throws IgniteCheckedException {
+             try {
+                 while (!closed && selector.isOpen() && !Thread.currentThread().isInterrupted()) {
+                     // Wake up every 2 seconds to check if closed.
+                     if (selector.select(2000) > 0)
+                         // Walk through the ready keys collection and process date requests.
+                         processSelectedKeys(selector.selectedKeys());
+                 }
+             }
+             // Ignore this exception as thread interruption is equal to 'close' call.
+             catch (ClosedByInterruptException e) {
+                 if (log.isDebugEnabled())
+                     log.debug("Closing selector due to thread interruption [srvr=" + this +
+                         ", err=" + e.getMessage() + ']');
+             }
+             catch (ClosedSelectorException e) {
+                 throw new IgniteCheckedException("Selector got closed while active: " + this, e);
+             }
+             catch (IOException e) {
+                 throw new IgniteCheckedException("Failed to accept connection: " + this, e);
+             }
+             finally {
+                 closeSelector();
+             }
+         }
+ 
+         /**
+          * Close selector if needed.
+          */
+         private void closeSelector() {
+             if (selector.isOpen()) {
+                 if (log.isDebugEnabled())
+                     log.debug("Closing all listening sockets.");
+ 
+                 // Close all channels registered with selector.
+                 for (SelectionKey key : selector.keys())
+                     U.close(key.channel(), log);
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Closing NIO selector.");
+ 
+                 U.close(selector, log);
+             }
+         }
+ 
+         /**
+          * Processes selected accept requests for server socket.
+          *
+          * @param keys Selected keys from acceptor.
+          * @throws IOException If accept failed or IOException occurred while configuring channel.
+          */
+         private void processSelectedKeys(Set<SelectionKey> keys) throws IOException {
+             if (log.isDebugEnabled())
+                 log.debug("Processing keys in accept worker: " + keys.size());
+ 
+             for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) {
+                 SelectionKey key = iter.next();
+ 
+                 iter.remove();
+ 
+                 // Was key closed?
+                 if (!key.isValid())
+                     continue;
+ 
+                 if (key.isAcceptable()) {
+                     // The key indexes into the selector so we
+                     // can retrieve the socket that's ready for I/O
+                     ServerSocketChannel srvrCh = (ServerSocketChannel)key.channel();
+ 
+                     SocketChannel sockCh = srvrCh.accept();
+ 
+                     sockCh.configureBlocking(false);
+                     sockCh.socket().setTcpNoDelay(tcpNoDelay);
+                     sockCh.socket().setKeepAlive(true);
+ 
+                     if (sockSndBuf > 0)
+                         sockCh.socket().setSendBufferSize(sockSndBuf);
+ 
+                     if (sockRcvBuf > 0)
+                         sockCh.socket().setReceiveBufferSize(sockRcvBuf);
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Accepted new client connection: " + sockCh.socket().getRemoteSocketAddress());
+ 
+                     addRegistrationReq(sockCh);
+                 }
+             }
+         }
+ 
+         /**
+          * Adds registration request for a given socket channel to the next selector. Next selector
+          * is selected according to a round-robin algorithm.
+          *
+          * @param sockCh Socket channel to be registered on one of the selectors.
+          */
+         private void addRegistrationReq(SocketChannel sockCh) {
+             offerBalanced(new NioOperationFuture(sockCh));
+         }
+     }
+ 
+     /**
+      * Asynchronous operation that may be requested on selector.
+      */
+     private enum NioOperation {
+         /** Register read key selection. */
+         REGISTER,
+ 
+         /** Register write key selection. */
+         REQUIRE_WRITE,
+ 
+         /** Close key. */
+         CLOSE,
+ 
+         /** Pause read. */
+         PAUSE_READ,
+ 
+         /** Resume read. */
+         RESUME_READ
+     }
+ 
+     /**
+      * Class for requesting write and session close operations.
+      */
+     private static class NioOperationFuture<R> extends GridNioFutureImpl<R> {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Socket channel in register request. */
+         @GridToStringExclude
+         private SocketChannel sockCh;
+ 
+         /** Session to perform operation on. */
+         private GridSelectorNioSessionImpl ses;
+ 
+         /** Is it a close request or a write request. */
+         private NioOperation op;
+ 
+         /** Message. */
+         private ByteBuffer msg;
+ 
+         /** Direct message. */
+         private GridTcpCommunicationMessageAdapter commMsg;
+ 
+         /** */
+         private boolean accepted;
+ 
+         /** */
+         private Map<Integer, ?> meta;
+ 
+         /**
+          * Creates registration request for a given socket channel.
+          *
+          * @param sockCh Socket channel to register on selector.
+          */
+         NioOperationFuture(SocketChannel sockCh) {
+             this(sockCh, true, null);
+         }
+ 
+         /**
+          * @param sockCh Socket channel.
+          * @param accepted {@code True} if socket has been accepted.
+          * @param meta Optional meta.
+          */
+         NioOperationFuture(
+             SocketChannel sockCh,
+             boolean accepted,
+             @Nullable Map<Integer, ?> meta
+         ) {
+             op = NioOperation.REGISTER;
+ 
+             this.sockCh = sockCh;
+             this.accepted = accepted;
+             this.meta = meta;
+         }
+ 
+         /**
+          * Creates change request.
+          *
+          * @param ses Session to change.
+          * @param op Requested operation.
+          */
+         NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op) {
+             assert ses != null;
+             assert op != null;
+             assert op != NioOperation.REGISTER;
+ 
+             this.ses = ses;
+             this.op = op;
+         }
+ 
+         /**
+          * Creates change request.
+          *
+          * @param ses Session to change.
+          * @param op Requested operation.
+          * @param msg Message.
+          */
+         NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
+             ByteBuffer msg) {
+             assert ses != null;
+             assert op != null;
+             assert op != NioOperation.REGISTER;
+             assert msg != null;
+ 
+             this.ses = ses;
+             this.op = op;
+             this.msg = msg;
+         }
+ 
+         /**
+          * Creates change request.
+          *
+          * @param ses Session to change.
+          * @param op Requested operation.
+          * @param commMsg Direct message.
+          */
+         NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
+             GridTcpCommunicationMessageAdapter commMsg) {
+             assert ses != null;
+             assert op != null;
+             assert op != NioOperation.REGISTER;
+             assert commMsg != null;
+ 
+             this.ses = ses;
+             this.op = op;
+             this.commMsg = commMsg;
+         }
+ 
+         /**
+          * @return Requested change operation.
+          */
+         private NioOperation operation() {
+             return op;
+         }
+ 
+         /**
+          * @return Message.
+          */
+         private ByteBuffer message() {
+             return msg;
+         }
+ 
+         /**
+          * @return Direct message.
+          */
+         private GridTcpCommunicationMessageAdapter directMessage() {
+             return commMsg;
+         }
+ 
+         /**
+          * @param ses New session instance.
+          */
+         private void resetMessage(GridSelectorNioSessionImpl ses) {
+             assert commMsg != null;
+ 
+             commMsg = commMsg.clone();
+ 
+             this.ses = ses;
+         }
+ 
+         /**
+          * @return Socket channel for register request.
+          */
+         private SocketChannel socketChannel() {
+             return sockCh;
+         }
+ 
+         /**
+          * @return Session for this change request.
+          */
+         private GridSelectorNioSessionImpl session() {
+             return ses;
+         }
+ 
+         /**
+          * @return {@code True} if connection has been accepted.
+          */
+         boolean accepted() {
+             return accepted;
+         }
+ 
+         /**
+          * @return Meta.
+          */
+         public Map<Integer, ?> meta() {
+             return meta;
+         }
+ 
+         /**
+          * Applicable to write futures only. Fails future with corresponding IOException.
+          */
+         private void connectionClosed() {
+             assert op == NioOperation.REQUIRE_WRITE;
+             assert ses != null;
+ 
+             onDone(new IOException("Failed to send message (connection was closed): " + ses));
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean skipRecovery() {
+             return commMsg != null && commMsg.skipRecovery();
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(NioOperationFuture.class, this);
+         }
+     }
+ 
+     /**
+      * Filter forwarding messages from chain's head to this server.
+      */
+     private class HeadFilter extends GridNioFilterAdapter {
+         /**
+          * Assigns filter name.
+          */
+         protected HeadFilter() {
+             super("HeadFilter");
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
+             if (directMode && sslFilter != null)
+                 ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new ConcurrentLinkedDeque8<>());
+ 
+             proceedSessionOpened(ses);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException {
+             proceedSessionClosed(ses);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
+             proceedExceptionCaught(ses, ex);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+             if (directMode) {
+                 boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
+ 
+                 if (sslSys) {
+                     ConcurrentLinkedDeque8<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
+ 
+                     queue.offer((ByteBuffer)msg);
+ 
+                     SelectionKey key = ((GridSelectorNioSessionImpl)ses).key();
+ 
+                     if (key.isValid())
+                         key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ 
+                     return null;
+                 }
+                 else
+                     return send(ses, (GridTcpCommunicationMessageAdapter)msg);
+             }
+             else
+                 return send(ses, (ByteBuffer)msg);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
+             proceedMessageReceived(ses, msg);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) {
+             return close(ses);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException {
+             proceedSessionIdleTimeout(ses);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
+             proceedSessionWriteTimeout(ses);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public GridNioFuture<?> onPauseReads(GridNioSession ses) throws IgniteCheckedException {
+             return pauseResumeReads(ses, NioOperation.PAUSE_READ);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException {
+             return pauseResumeReads(ses, NioOperation.RESUME_READ);
+         }
+     }
+ 
+     /**
+      * Constructs a new instance of {@link GridNioServer}.
+      */
+     @SuppressWarnings("PublicInnerClass")
+     public static class Builder<T> {
+         /** Empty filters. */
+         private static final GridNioFilter[] EMPTY_FILTERS = new GridNioFilter[0];
+ 
+         /** Local address. */
+         private InetAddress addr;
+ 
+         /** Local port. */
+         private int port;
+ 
+         /** Logger. */
+         private IgniteLogger log;
+ 
+         /** Selector count. */
+         private int selectorCnt;
+ 
+         /** Grid name. */
+         private String gridName;
+ 
+         /** TCP_NO_DELAY flag. */
+         private boolean tcpNoDelay;
+ 
+         /** Direct buffer flag. */
+         private boolean directBuf;
+ 
+         /** Byte order. */
+         private ByteOrder byteOrder = ByteOrder.nativeOrder();
+ 
+         /** NIO server listener. */
+         private GridNioServerListener<T> lsnr;
+ 
+         /** Send buffer size. */
+         private int sockSndBufSize;
+ 
+         /** Receive buffer size. */
+         private int sockRcvBufSize;
+ 
+         /** Send queue limit. */
+         private int sndQueueLimit = DFLT_SEND_QUEUE_LIMIT;
+ 
+         /** Whether direct mode is used. */
+         private boolean directMode;
+ 
+         /** Metrics listener. */
+         private GridNioMetricsListener metricsLsnr;
+ 
 -        /** Message writer. */
 -        private GridNioMessageWriter msgWriter;
 -
+         /** NIO filters. */
+         private GridNioFilter[] filters;
+ 
+         /** Idle timeout. */
+         private long idleTimeout = -1;
+ 
+         /** Write timeout. */
+         private long writeTimeout = -1;
+ 
+         /** Daemon flag. */
+         private boolean daemon;
+ 
+         /**
+          * Finishes building the instance.
+          *
+          * @return Final instance of {@link GridNioServer}.
+          * @throws IgniteCheckedException If NIO client worker creation failed or address is already in use.
+          */
+         public GridNioServer<T> build() throws IgniteCheckedException {
+             GridNioServer<T> ret = new GridNioServer<>(
+                 addr,
+                 port,
+                 log,
+                 selectorCnt,
+                 gridName,
+                 tcpNoDelay,
+                 directBuf,
+                 byteOrder,
+                 lsnr,
+                 sockSndBufSize,
+                 sockRcvBufSize,
+                 sndQueueLimit,
+                 directMode,
+                 daemon,
+                 metricsLsnr,
 -                msgWriter,
+                 filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
+             );
+ 
+             if (idleTimeout >= 0)
+                 ret.idleTimeout(idleTimeout);
+ 
+             if (writeTimeout >= 0)
+                 ret.writeTimeout(writeTimeout);
+ 
+             return ret;
+         }
+ 
+         /**
+          * @param addr Local address.
+          * @return This for chaining.
+          */
+         public Builder<T> address(InetAddress addr) {
+             this.addr = addr;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param port Local port. If {@code -1} passed then server will not be
+          *      accepting connections and only outgoing connections will be possible.
+          * @return This for chaining.
+          */
+         public Builder<T> port(int port) {
+             this.port = port;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param log Logger.
+          * @return This for chaining.
+          */
+         public Builder<T> logger(IgniteLogger log) {
+             this.log = log;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param selectorCnt Selector count.
+          * @return This for chaining.
+          */
+         public Builder<T> selectorCount(int selectorCnt) {
+             this.selectorCnt = selectorCnt;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param gridName Grid name.
+          * @return This for chaining.
+          */
+         public Builder<T> gridName(@Nullable String gridName) {
+             this.gridName = gridName;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets.
+          * @return This for chaining.
+          */
+         public Builder<T> tcpNoDelay(boolean tcpNoDelay) {
+             this.tcpNoDelay = tcpNoDelay;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param directBuf Whether to use direct buffer.
+          * @return This for chaining.
+          */
+         public Builder<T> directBuffer(boolean directBuf) {
+             this.directBuf = directBuf;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param byteOrder Byte order to use.
+          * @return This for chaining.
+          */
+         public Builder<T> byteOrder(ByteOrder byteOrder) {
+             this.byteOrder = byteOrder;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param lsnr NIO server listener.
+          * @return This for chaining.
+          */
+         public Builder<T> listener(GridNioServerListener<T> lsnr) {
+             this.lsnr = lsnr;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param sockSndBufSize Socket send buffer size.
+          * @return This for chaining.
+          */
+         public Builder<T> socketSendBufferSize(int sockSndBufSize) {
+             this.sockSndBufSize = sockSndBufSize;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param sockRcvBufSize Socket receive buffer size.
+          * @return This for chaining.
+          */
+         public Builder<T> socketReceiveBufferSize(int sockRcvBufSize) {
+             this.sockRcvBufSize = sockRcvBufSize;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param sndQueueLimit Send queue limit.
+          * @return This for chaining.
+          */
+         public Builder<T> sendQueueLimit(int sndQueueLimit) {
+             this.sndQueueLimit = sndQueueLimit;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param directMode Whether direct mode is used.
+          * @return This for chaining.
+          */
+         public Builder<T> directMode(boolean directMode) {
+             this.directMode = directMode;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param metricsLsnr Metrics listener.
+          * @return This for chaining.
+          */
+         public Builder<T> metricsListener(GridNioMetricsListener metricsLsnr) {
+             this.metricsLsnr = metricsLsnr;
+ 
+             return this;
+         }
+ 
+         /**
 -         * @param msgWriter Message writer.
 -         * @return This for chaining.
 -         */
 -        public Builder<T> messageWriter(GridNioMessageWriter msgWriter) {
 -            this.msgWriter = msgWriter;
 -
 -            return this;
 -        }
 -
 -        /**
+          * @param filters NIO filters.
+          * @return This for chaining.
+          */
+         public Builder<T> filters(GridNioFilter... filters) {
+             this.filters = filters;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param idleTimeout Idle timeout.
+          * @return This for chaining.
+          */
+         public Builder<T> idleTimeout(long idleTimeout) {
+             this.idleTimeout = idleTimeout;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param writeTimeout Write timeout.
+          * @return This for chaining.
+          */
+         public Builder<T> writeTimeout(long writeTimeout) {
+             this.writeTimeout = writeTimeout;
+ 
+             return this;
+         }
+ 
+         /**
+          * @param daemon Daemon flag to create threads.
+          * @return This for chaining.
+          */
+         public Builder<T> daemon(boolean daemon) {
+             this.daemon = daemon;
+ 
+             return this;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index 0000000,5cbc506..1b07b5e
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@@ -1,0 -1,148 +1,140 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.nio;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.ipc.shmem.*;
+ import org.jetbrains.annotations.*;
+ import org.apache.ignite.internal.util.lang.*;
+ 
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+ 
+ /**
+  *
+  */
+ public class GridShmemCommunicationClient extends GridAbstractCommunicationClient {
+     /** */
+     private final IpcSharedMemoryClientEndpoint shmem;
+ 
+     /** */
+     private final ByteBuffer writeBuf;
+ 
 -    /** */
 -    private final GridNioMessageWriter msgWriter;
 -
+     /**
+      * @param metricsLsnr Metrics listener.
+      * @param port Shared memory IPC server port.
+      * @param connTimeout Connection timeout.
+      * @param log Logger.
 -     * @param msgWriter Message writer.
+      * @throws IgniteCheckedException If failed.
+      */
 -    public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, int port, long connTimeout, IgniteLogger log,
 -        GridNioMessageWriter msgWriter)
 -        throws IgniteCheckedException {
++    public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, int port, long connTimeout,
++        IgniteLogger log) throws IgniteCheckedException {
+         super(metricsLsnr);
+ 
+         assert metricsLsnr != null;
 -        assert msgWriter != null;
+         assert port > 0 && port < 0xffff;
+         assert connTimeout >= 0;
+ 
+         shmem = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log);
+ 
 -        this.msgWriter = msgWriter;
 -
+         writeBuf = ByteBuffer.allocate(8 << 10);
+ 
+         writeBuf.order(ByteOrder.nativeOrder());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public  synchronized void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC)
+         throws IgniteCheckedException {
+         handshakeC.applyx(shmem.inputStream(), shmem.outputStream());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean close() {
+         boolean res = super.close();
+ 
+         if (res)
+             shmem.close();
+ 
+         return res;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void forceClose() {
+         super.forceClose();
+ 
+         // Do not call forceClose() here.
+         shmem.close();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public synchronized void sendMessage(byte[] data, int len) throws IgniteCheckedException {
+         if (closed())
+             throw new IgniteCheckedException("Communication client was closed: " + this);
+ 
+         try {
+             shmem.outputStream().write(data, 0, len);
+ 
+             metricsLsnr.onBytesSent(len);
+         }
+         catch (IOException e) {
+             throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e);
+         }
+ 
+         markUsed();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
+         throws IgniteCheckedException {
+         if (closed())
+             throw new IgniteCheckedException("Communication client was closed: " + this);
+ 
+         assert writeBuf.hasArray();
+ 
+         try {
 -            int cnt = msgWriter.writeFully(nodeId, msg, shmem.outputStream(), writeBuf);
++            int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf);
+ 
+             metricsLsnr.onBytesSent(cnt);
+         }
+         catch (IOException e) {
+             throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e);
+         }
+ 
+         markUsed();
+ 
+         return false;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
+         throw new UnsupportedOperationException();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void flushIfNeeded(long timeout) throws IOException {
+         // No-op.
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridShmemCommunicationClient.class, this, super.toString());
+     }
+ }