You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:52 UTC

[29/50] [abbrv] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-3054

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f388e3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f6d19e0,50fa3bd..e743b5c
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@@ -158,14 -128,12 +158,15 @@@ import org.apache.ignite.spi.discovery.
  import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
  import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient;
  import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
 +import org.apache.ignite.thread.IgniteThread;
 +import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
  import org.jetbrains.annotations.Nullable;
  import org.jsr166.ConcurrentHashMap8;
 +import org.jsr166.ConcurrentLinkedDeque8;
  
  import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
- import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
+ import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
  import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
  import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
  import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@@ -205,25 -173,9 +206,24 @@@ class ServerImpl extends TcpDiscoveryIm
      private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE =
          IgniteProductVersion.fromString("1.5.0");
  
 +    /** Node ID in GridNioSession. */
 +    private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
 +
 +    /** ClientNioMessageWorker in GridNioSession. */
 +    private static final int NIO_WORKER_META = GridNioSessionMetaKey.nextUniqueKey();
 +
 +    /**
 +     * Number of tries to reopen ServerSocketChannel on 'SocketException: Invalid argument'.
 +     * <p>This error may happen on simultaneous server nodes startup on the same JVM.</p>
 +     */
 +    private static final int REOPEN_SERVER_SOCKET_CHANNEL_TRIES = 3;
 +
      /** */
-     private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
-         new LinkedBlockingQueue<Runnable>());
+     private IgniteThreadPoolExecutor utilityPool;
  
 +    /** */
 +    private IgniteThreadPoolExecutor nioClientProcessingPool;
 +
      /** Nodes ring. */
      @GridToStringExclude
      private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
@@@ -5356,16 -5444,19 +5677,20 @@@
  
              int lastPort = spi.locPortRange == 0 ? spi.locPort : spi.locPort + spi.locPortRange - 1;
  
 +            openServerSocketChannel();
 +
 +            int reopenTries = 0;
 +
              for (port = spi.locPort; port <= lastPort; port++) {
                  try {
 -                    if (spi.isSslEnabled())
 -                        srvrSock = spi.sslSrvSockFactory.createServerSocket(port, 0, spi.locHost);
 -                    else
 -                        srvrSock = new ServerSocket(port, 0, spi.locHost);
 +                    srvCh.bind(new InetSocketAddress(spi.locHost, port));
  
-                     if (log.isInfoEnabled())
-                         log.info("Successfully bound to TCP port [port=" + port + ", localHost=" + spi.locHost + ']');
+                     if (log.isInfoEnabled()) {
+                         log.info("Successfully bound to TCP port [port=" + port +
+                             ", localHost=" + spi.locHost +
+                             ", locNodeId=" + spi.ignite().configuration().getNodeId() +
+                             ']');
+                     }
  
                      return;
                  }
@@@ -5508,1877 -5525,606 +5833,1883 @@@
      }
  
      /**
 -     * Thread that reads messages from the socket created for incoming connections.
 +     * Encapsulates ping logic for client processors.
       */
 -    private class SocketReader extends IgniteSpiThread {
 -        /** Socket to read data from. */
 -        private final Socket sock;
 +    private class ClientMessagePinger {
 +        /** */
 +        final AtomicReference<GridFutureAdapter<Boolean>> pingFut = new AtomicReference<>();
  
          /** */
 -        private volatile UUID nodeId;
 +        private final ClientMessageProcessor proc;
  
          /**
 -         * Constructor.
 -         *
 -         * @param sock Socket to read data from.
 +         * @param proc Processor.
           */
 -        SocketReader(Socket sock) {
 -            super(spi.ignite().name(), "tcp-disco-sock-reader", log);
 +        private ClientMessagePinger(final ClientMessageProcessor proc) {
 +            this.proc = proc;
 +        }
  
 -            this.sock = sock;
 +        /**
 +         * @param timeoutHelper Time out helper.
 +         * @return {@code True} if ping sent.
 +         * @throws InterruptedException If fail.
 +         */
 +        public boolean ping(final IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException {
 +            if (spi.isNodeStopping0())
 +                return false;
  
 -            setPriority(spi.threadPri);
 +            GridFutureAdapter<Boolean> fut;
  
 -            spi.stats.onSocketReaderCreated();
 -        }
 +            while (true) {
 +                fut = pingFut.get();
  
 -        /** {@inheritDoc} */
 -        @Override protected void body() throws InterruptedException {
 -            UUID locNodeId = getConfiguredNodeId();
 +                if (fut != null)
 +                    break;
  
 -            ClientMessageWorker clientMsgWrk = null;
 +                fut = new GridFutureAdapter<>();
  
 -            try {
 -                InputStream in;
 +                if (pingFut.compareAndSet(null, fut)) {
 +                    TcpDiscoveryPingRequest pingReq = new TcpDiscoveryPingRequest(getLocalNodeId(), proc.clientNodeId());
  
 -                try {
 -                    // Set socket options.
 -                    sock.setKeepAlive(true);
 -                    sock.setTcpNoDelay(true);
 +                    pingReq.verify(getLocalNodeId());
  
 -                    int timeout = sock.getSoTimeout();
 +                    proc.addMessage(pingReq);
  
 -                    sock.setSoTimeout((int)spi.netTimeout);
 +                    break;
 +                }
 +            }
  
 -                    for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
 -                        connLsnr.apply(sock);
 +            try {
 +                return fut.get(timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()),
 +                    TimeUnit.MILLISECONDS);
 +            }
 +            catch (IgniteInterruptedCheckedException ignored) {
 +                throw new InterruptedException();
 +            }
 +            catch (IgniteFutureTimeoutCheckedException ignored) {
 +                if (pingFut.compareAndSet(fut, null))
 +                    fut.onDone(false);
  
 -                    int rcvBufSize = sock.getReceiveBufferSize();
 +                return false;
 +            }
 +            catch (IgniteCheckedException e) {
 +                throw new IgniteSpiException("Internal error: ping future cannot be done with exception", e);
 +            }
 +        }
  
 -                    in = new BufferedInputStream(sock.getInputStream(), rcvBufSize > 0 ? rcvBufSize : 8192);
 +        /**
 +         * @param res Ping result.
 +         */
 +        void pingResult(final boolean res) {
 +            final GridFutureAdapter<Boolean> fut = pingFut.getAndSet(null);
  
 -                    byte[] buf = new byte[4];
 -                    int read = 0;
 +            if (fut != null)
 +                fut.onDone(res);
 +        }
 +    }
  
 -                    while (read < buf.length) {
 -                        int r = in.read(buf, read, buf.length - read);
 +    /**
 +     * NIO worker state.
 +     */
 +    private enum WorkerState {
 +        /** Initial worker state. */
 +        NOT_STARTED,
  
 -                        if (r >= 0)
 -                            read += r;
 -                        else {
 -                            if (log.isDebugEnabled())
 -                                log.debug("Failed to read magic header (too few bytes received) " +
 -                                    "[rmtAddr=" + sock.getRemoteSocketAddress() +
 -                                    ", locAddr=" + sock.getLocalSocketAddress() + ']');
 +        /** Started but all messages are added to queue. */
 +        STARTED,
  
 -                            LT.warn(log, "Failed to read magic header (too few bytes received) [rmtAddr=" +
 -                                sock.getRemoteSocketAddress() + ", locAddr=" + sock.getLocalSocketAddress() + ']');
 +        /** Started and all messages are sent to client node. */
 +        JOINED,
  
 -                            return;
 -                        }
 -                    }
 +        /** Worker was stopped. */
 +        STOPPED
 +    }
  
 -                    if (!Arrays.equals(buf, U.IGNITE_HEADER)) {
 -                        if (log.isDebugEnabled())
 -                            log.debug("Unknown connection detected (is some other software connecting to " +
 -                                "this Ignite port?" +
 -                                (!spi.isSslEnabled() ? " missed SSL configuration?" : "" ) +
 -                                ") " +
 -                                "[rmtAddr=" + sock.getRemoteSocketAddress() +
 -                                ", locAddr=" + sock.getLocalSocketAddress() + ']');
 +    /**
 +     * Non blocking client message processor.
 +     */
 +    private class ClientNioMessageWorker implements ClientMessageProcessor {
 +        /** ID of the node served by this processor. */
 +        private final UUID clientNodeId;
  
 -                        LT.warn(log, "Unknown connection detected (is some other software connecting to " +
 -                            "this Ignite port?" +
 -                            (!spi.isSslEnabled() ? " missing SSL configuration on remote node?" : "" ) +
 -                            ") [rmtAddr=" + sock.getInetAddress() + ']', true);
 +        /** Socket connected to the client. */
 +        private final Socket sock;
  
 -                        return;
 -                    }
 +        /** Current session. */
 +        private volatile GridNioSession ses;
  
 -                    // Restore timeout.
 -                    sock.setSoTimeout(timeout);
 +        /** */
 +        private volatile ClusterMetrics metrics;
  
 -                    TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
 +        /** */
 +        private volatile ClientMessagePinger pinger;
  
 -                    // Ping.
 -                    if (msg instanceof TcpDiscoveryPingRequest) {
 -                        if (!spi.isNodeStopping0()) {
 -                            TcpDiscoveryPingRequest req = (TcpDiscoveryPingRequest)msg;
 +        /** Client message queue. */
 +        private final Deque<T2<TcpDiscoveryAbstractMessage, byte[]>> msgQueue;
  
 -                            TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
 +        /** Worker state. */
 +        private volatile WorkerState state = WorkerState.NOT_STARTED;
  
 -                            IgniteSpiOperationTimeoutHelper timeoutHelper =
 -                                new IgniteSpiOperationTimeoutHelper(spi);
 +        /** Indicates whether messages are sent to client. Only one thread can send messages to single client to keep their order. */
 +        private final AtomicBoolean sending = new AtomicBoolean(false);
  
 -                            if (req.clientNodeId() != null) {
 -                                ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
 +        /**
 +         * @param clientNodeId Client node ID.
 +         * @param sock Socket.
 +         */
 +        ClientNioMessageWorker(final UUID clientNodeId, final Socket sock) {
 +            this.clientNodeId = clientNodeId;
 +            this.sock = sock;
  
 -                                if (clientWorker != null)
 -                                    res.clientExists(clientWorker.ping(timeoutHelper));
 -                            }
 +            msgQueue = new ConcurrentLinkedDeque8<>();
 +        }
  
 -                            spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 -                        }
 -                        else if (log.isDebugEnabled())
 -                            log.debug("Ignore ping request, node is stopping.");
 +        /**
 +         * Open session and start listen for client messages.
 +         *
 +         * @throws IgniteCheckedException If fail.
 +         */
 +        @SuppressWarnings("unchecked")
 +        public synchronized void start() throws IgniteCheckedException, SSLException {
 +            if (state != WorkerState.NOT_STARTED)
 +                return;
  
 -                        return;
 -                    }
 +            final Map<Integer, Object> meta = new HashMap<>();
  
 -                    // Handshake.
 -                    TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
 +            meta.put(NODE_ID_META, clientNodeId());
  
 -                    UUID nodeId = req.creatorNodeId();
 +            final SocketChannel ch = sock.getChannel();
  
 -                    this.nodeId = nodeId;
 +            if (spi.isSslEnabled()) {
 +                assert sock instanceof NioSslSocket;
  
 -                    TcpDiscoveryHandshakeResponse res =
 -                        new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
 +                // Put the engine to the meta map to allow nio server to use it:
 +                meta.put(GridNioSessionMetaKey.SSL_META.ordinal(), ((NioSslSocket)sock).sslEngine);
 +            }
  
 -                    if (req.client())
 -                        res.clientAck(true);
 +            ses = clientNioSrv.createSession(ch, meta).get();
  
 -                    spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ?
 -                        spi.failureDetectionTimeout() : spi.getSocketTimeout());
 +            ses.addMeta(NIO_WORKER_META, this);
  
 -                    // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
 -                    // the local node sends a handshake request message on the loopback address, so we get here.
 -                    if (locNodeId.equals(nodeId)) {
 -                        assert !req.client();
 +            state = WorkerState.STARTED;
 +        }
  
 -                        if (log.isDebugEnabled())
 -                            log.debug("Handshake request from local node: " + req);
 +        /**
 +         * Close connection to the client.
 +         *
 +         * @throws IgniteCheckedException If fail.
 +         */
 +        public void stop() throws IgniteCheckedException {
 +            nonblockingStop().get();
 +        }
  
 -                        return;
 +        /**
 +         * Close connection to the client and exit immediately - do not wait
 +         * when operation completes.
 +         *
 +         * @return Operation future.
 +         */
 +        synchronized IgniteInternalFuture nonblockingStop() {
 +            if (state == WorkerState.NOT_STARTED)
 +                state = WorkerState.STOPPED;
 +
 +            if (state == WorkerState.STOPPED)
 +                return new GridFinishedFuture<>();
 +
 +            final IgniteInternalFuture<Object> res = ses.close().chain(new C1<IgniteInternalFuture<Boolean>, Object>() {
 +                @Override public Object apply(final IgniteInternalFuture<Boolean> fut) {
 +                    try {
 +                        return fut.get();
 +                    }
 +                    catch (IgniteCheckedException e) {
 +                        throw new IgniteSpiException(e.getMessage(), e);
                      }
 +                    finally {
 +                        U.closeQuiet(sock);
 +                    }
 +                }
 +            });
  
 -                    if (req.client()) {
 -                        ClientMessageWorker clientMsgWrk0 = new ClientMessageWorker(sock, nodeId);
 +            state = WorkerState.STOPPED;
  
 -                        while (true) {
 -                            ClientMessageWorker old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
 +            nioWorkers.remove(this);
  
 -                            if (old == null)
 -                                break;
 +            ses.removeMeta(NIO_WORKER_META);
  
 -                            if (old.isInterrupted()) {
 -                                clientMsgWorkers.remove(nodeId, old);
 +            return res;
 +        }
  
 -                                continue;
 -                            }
 +        /**
 +         * Change state to JOINED and send all pending messages.
 +         */
 +        void markJoinedAndSendPendingMessages() {
 +            if (state == WorkerState.STARTED) {
 +                state = WorkerState.JOINED;
  
 -                            old.join(500);
 +                nioWorkers.add(this);
  
 -                            old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
 +                nioSem.release();
 +            }
 +        }
  
 -                            if (old == null)
 -                                break;
 +        /**
 +         * Add receipt to message queue.
 +         *
 +         * @param receipt Receipt.
 +         * @return Send future.
 +         */
 +        GridNioFuture<?> addReceipt(final int receipt) {
 +            try {
 +                return spi.sendMessage(ses, null, new byte[]{(byte) receipt});
 +            }
 +            catch (IgniteCheckedException e) {
 +                log.error("Failed marshal message, closing connection. [receipt=" + receipt + ", ses=" + ses + ']', e);
  
 -                            if (log.isDebugEnabled())
 -                                log.debug("Already have client message worker, closing connection " +
 -                                    "[locNodeId=" + locNodeId +
 -                                    ", rmtNodeId=" + nodeId +
 -                                    ", workerSock=" + old.sock +
 -                                    ", sock=" + sock + ']');
 +                nonblockingStop();
  
 -                            return;
 -                        }
 +                clientMsgWorkers.remove(clientNodeId, this);
  
 -                        if (log.isDebugEnabled())
 -                            log.debug("Created client message worker [locNodeId=" + locNodeId +
 -                                ", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
 +                return new GridNioFinishedFuture<>(e);
 +            }
 +        }
  
 -                        assert clientMsgWrk0 == clientMsgWorkers.get(nodeId);
 +        /**
 +         * Add receipt and call closure when it will be sent.
 +         *
 +         * @param receipt Receipt.
 +         * @param clos Closure.
 +         * @return Future for chaining.
 +         */
 +        IgniteInternalFuture<?> addReceipt(final int receipt,
 +            final IgniteClosure<? super IgniteInternalFuture<?>, ?> clos) {
 +            return addReceipt(receipt).chain(clos);
 +        }
  
 -                        clientMsgWrk = clientMsgWrk0;
 -                    }
 +        /** {@inheritDoc} */
 +        @Override public void addMessage(final TcpDiscoveryAbstractMessage msg) {
 +            addMessage(msg, null);
 +        }
  
 -                    if (log.isDebugEnabled())
 -                        log.debug("Initialized connection with remote node [nodeId=" + nodeId +
 -                            ", client=" + req.client() + ']');
 +        /** {@inheritDoc} */
 +        @Override public void addMessage(final TcpDiscoveryAbstractMessage msg, @Nullable final byte[] msgBytes) {
 +            // add message to queue if client is not joined yet
 +            final WorkerState state0 = this.state;
  
 -                    if (debugMode) {
 -                        debugLog(msg, "Initialized connection with remote node [nodeId=" + nodeId +
 -                            ", client=" + req.client() + ']');
 -                    }
 -                }
 -                catch (IOException e) {
 -                    if (log.isDebugEnabled())
 -                        U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 +            if (state0 == WorkerState.STOPPED)
 +                return;
  
 -                    if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0())
 -                        LT.warn(log, "Failed to initialize connection " +
 -                            "(missing SSL configuration on remote node?) " +
 -                            "[rmtAddr=" + sock.getInetAddress() + ']', true);
 -                    else if ((X.hasCause(e, ObjectStreamException.class) || !sock.isClosed())
 -                        && !spi.isNodeStopping0()) {
 -                        if (U.isMacInvalidArgumentError(e))
 -                            LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" +
 -                                U.MAC_INVALID_ARG_MSG);
 -                        else {
 -                            U.error(
 -                                log,
 -                                "Failed to initialize connection (this can happen due to short time " +
 -                                    "network problems and can be ignored if does not affect node discovery) " +
 -                                    "[sock=" + sock + ']',
 -                                e);
 -                        }
 -                    }
 +            if (msg.highPriority())
 +                msgQueue.addFirst(new T2<>(msg, msgBytes));
 +            else
 +                msgQueue.add(new T2<>(msg, msgBytes));
  
 -                    onException("Caught exception on handshake [err=" + e + ", sock=" + sock + ']', e);
 +            nioSem.release();
 +        }
  
 -                    return;
 -                }
 -                catch (IgniteCheckedException e) {
 -                    if (log.isDebugEnabled())
 -                        U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 +        /**
 +         * Send message despite of worker state.
 +         *
 +         * @param msg Message to send.
 +         * @param msgBytes Message bytes to send.
 +         */
 +        public void sendMessage(final TcpDiscoveryAbstractMessage msg, @Nullable final byte[] msgBytes) {
 +            try {
 +                spi.sendMessage(ses, msg, msgBytes);
 +            }
 +            catch (IgniteCheckedException e) {
 +                log.error("Failed marshal message, closing connection. [msg=" + msg + ", ses=" + ses + ']', e);
  
 -                    onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 +                nonblockingStop();
  
 -                    if (e.hasCause(SocketTimeoutException.class))
 -                        LT.warn(log, "Socket operation timed out on handshake " +
 -                            "(consider increasing 'networkTimeout' configuration property) " +
 -                            "[netTimeout=" + spi.netTimeout + ']');
 +                clientMsgWorkers.remove(clientNodeId, this);
 +            }
 +        }
  
 -                    else if (e.hasCause(ClassNotFoundException.class))
 -                        LT.warn(log, "Failed to read message due to ClassNotFoundException " +
 -                            "(make sure same versions of all classes are available on all nodes) " +
 -                            "[rmtAddr=" + sock.getRemoteSocketAddress() +
 -                            ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
 +        /** {@inheritDoc} */
 +        @Override public boolean ping(final IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException {
 +            return pinger().ping(timeoutHelper);
 +        }
  
 -                        // Always report marshalling problems.
 -                    else if (e.hasCause(ObjectStreamException.class) ||
 -                        (!sock.isClosed() && !e.hasCause(IOException.class)))
 -                        LT.error(log, e, "Failed to initialize connection [sock=" + sock + ']');
 +        /** {@inheritDoc} */
 +        @Override public void pingResult(final boolean res) {
 +            pinger().pingResult(res);
 +        }
  
 -                    return;
 -                }
 +        /** {@inheritDoc} */
 +        @Override public UUID clientNodeId() {
 +            return clientNodeId;
 +        }
  
 -                long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
 -                    spi.getSocketTimeout();
 +        /** {@inheritDoc} */
 +        @Override public ClusterMetrics metrics() {
 +            return metrics;
 +        }
  
 -                while (!isInterrupted()) {
 -                    try {
 -                        TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
 -                            U.resolveClassLoader(spi.ignite().configuration()));
 +        /** {@inheritDoc} */
 +        @Override public void metrics(final ClusterMetrics metrics) {
 +            this.metrics = metrics;
 +        }
  
 -                        msg.senderNodeId(nodeId);
 +        /**
 +         * @return Pinger.
 +         */
 +        private ClientMessagePinger pinger() {
 +            if (pinger == null) {
 +                synchronized (this) {
 +                    if (pinger == null)
 +                        pinger = new ClientMessagePinger(this);
 +                }
 +            }
  
 -                        DebugLogger debugLog = messageLogger(msg);
 +            return pinger;
 +        }
  
 -                        if (debugLog.isDebugEnabled())
 -                            debugLog.debug("Message has been received: " + msg);
 +        /**
 +         * @return Current worker state.
 +         */
 +        public WorkerState state() {
 +            return state;
 +        }
  
 -                        spi.stats.onMessageReceived(msg);
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(ClientNioMessageWorker.class, this);
 +        }
 +    }
  
 -                        if (debugMode && recordable(msg))
 -                            debugLog(msg, "Message has been received: " + msg);
  
 -                        if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
 -                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
  
 -                            continue;
 -                        }
 -                        else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
 -                            TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
 +    /**
 +     * Processes incoming nio client messages.
 +     */
 +    private class ClientNioMessageProcessor {
 +        /** */
 +        private final IgniteLogger log;
  
 -                            if (!req.responded()) {
 -                                boolean ok = processJoinRequestMessage(req, clientMsgWrk);
 +        /**
 +         * @param log Logger.
 +         */
 +        private ClientNioMessageProcessor(IgniteLogger log) {
 +            this.log = log;
 +        }
  
 -                                if (clientMsgWrk != null && ok)
 -                                    continue;
 -                                else
 -                                    // Direct join request - no need to handle this socket anymore.
 -                                    break;
 -                            }
 -                        }
 -                        else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
 -                            if (clientMsgWrk != null) {
 -                                TcpDiscoverySpiState state = spiStateCopy();
 +        /**
 +         * Process client message.
 +         *
 +         * @param ses Nio session.
 +         * @param msg0 Incoming message.
 +         */
 +        void processMessage(GridNioSession ses, byte[] msg0) throws IgniteCheckedException {
 +            final TcpDiscoveryAbstractMessage msg = spi.marshaller().unmarshal(msg0,
 +                    U.resolveClassLoader(spi.ignite().configuration()));
  
 -                                if (state == CONNECTED) {
 -                                    spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +            final UUID nodeId = getConfiguredNodeId();
 +            final UUID clientNodeId = clientNodeId(ses);
  
 -                                    if (clientMsgWrk.getState() == State.NEW)
 -                                        clientMsgWrk.start();
 +            final ClientNioMessageWorker clientMsgWrk = ses.meta(NIO_WORKER_META);
  
 -                                    msgWorker.addMessage(msg);
 +            if (clientMsgWrk == null) {
 +                if (log.isDebugEnabled())
 +                    log.debug("NIO Worker has been closed, drop message. [clientNodeId="
 +                        + clientNodeId + ", message=" + msg + "]");
  
 -                                    continue;
 -                                }
 -                                else {
 -                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
 +                if (ses.closeTime() == 0)
 +                    ses.close();
  
 -                                    break;
 -                                }
 -                            }
 -                        }
 -                        else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
 -                            // Send receipt back.
 -                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +                return;
 +            }
  
 -                            boolean ignored = false;
 +            msg.senderNodeId(nodeId);
  
 -                            TcpDiscoverySpiState state = null;
 +            if (log.isDebugEnabled())
 +                log.debug("Message has been received: " + msg);
  
 -                            synchronized (mux) {
 -                                if (spiState == CONNECTING) {
 -                                    joinRes.set(msg);
 +            spi.stats.onMessageReceived(msg);
  
 -                                    spiState = DUPLICATE_ID;
 +            if (debugMode && recordable(msg))
 +                debugLog(msg, "Message has been received: " + msg);
  
 -                                    mux.notifyAll();
 -                                }
 -                                else {
 -                                    ignored = true;
 +            if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
 +                clientMsgWrk.addReceipt(RES_OK);
  
 -                                    state = spiState;
 -                                }
 -                            }
 +                return;
 +            }
 +            else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
 +                final TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
  
 -                            if (ignored && log.isDebugEnabled())
 -                                log.debug("Duplicate ID message has been ignored [msg=" + msg +
 -                                    ", spiState=" + state + ']');
 +                if (!req.responded()) {
 +                    final TcpDiscoverySpiState state = spiStateCopy();
  
 -                            continue;
 -                        }
 -                        else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
 -                            // Send receipt back.
 -                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +                    if (state == CONNECTED) {
 +                        clientMsgWrk.addReceipt(RES_OK, new CX1<IgniteInternalFuture<?>, Object>() {
 +                            private static final long serialVersionUID = 0L;
  
 -                            boolean ignored = false;
 +                            @Override public Object applyx(
 +                                final IgniteInternalFuture<?> fut) throws IgniteCheckedException {
 +                                req.responded(true);
  
 -                            TcpDiscoverySpiState state = null;
 +                                msgWorker.addMessage(req);
  
 -                            synchronized (mux) {
 -                                if (spiState == CONNECTING) {
 -                                    joinRes.set(msg);
 +                                clientMsgWrk.markJoinedAndSendPendingMessages();
  
 -                                    spiState = AUTH_FAILED;
 +                                return null;
 +                            }
 +                        });
  
 -                                    mux.notifyAll();
 -                                }
 -                                else {
 -                                    ignored = true;
 +                        return;
 +                    }
 +                    else {
 +                        spi.stats.onMessageProcessingStarted(req);
  
 -                                    state = spiState;
 -                                }
 -                            }
 +                        final Integer res;
  
 -                            if (ignored && log.isDebugEnabled())
 -                                log.debug("Auth failed message has been ignored [msg=" + msg +
 -                                    ", spiState=" + state + ']');
 +                        final SocketAddress rmtAddr = clientMsgWrk.sock.getRemoteSocketAddress();
  
 -                            continue;
 +                        if (state == CONNECTING) {
 +                            if (noResAddrs.contains(rmtAddr) ||
 +                                getLocalNodeId().compareTo(req.creatorNodeId()) < 0)
 +                                // Remote node node has not responded to join request or loses UUID race.
 +                                res = RES_WAIT;
 +                            else
 +                                // Remote node responded to join request and wins UUID race.
 +                                res = RES_CONTINUE_JOIN;
                          }
 -                        else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
 -                            // Send receipt back.
 -                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +                        else
 +                            // Local node is stopping. Remote node should try next one.
 +                            res = RES_CONTINUE_JOIN;
  
 -                            boolean ignored = false;
 +                        clientMsgWrk.addReceipt(res, new CX1<IgniteInternalFuture<?>, Object>() {
 +                            private static final long serialVersionUID = 0L;
  
 -                            TcpDiscoverySpiState state = null;
 +                            @Override public Object applyx(
 +                                final IgniteInternalFuture<?> fut) throws IgniteCheckedException {
 +                                if (log.isDebugEnabled())
 +                                    log.debug("Responded to join request message [msg=" + req + ", res=" + res + ']');
  
 -                            synchronized (mux) {
 -                                if (spiState == CONNECTING) {
 -                                    joinRes.set(msg);
 +                                fromAddrs.addAll(req.node().socketAddresses());
  
 -                                    spiState = CHECK_FAILED;
 +                                spi.stats.onMessageProcessingFinished(req);
  
 -                                    mux.notifyAll();
 -                                }
 -                                else {
 -                                    ignored = true;
 +                                clientMsgWrk.nonblockingStop();
  
 -                                    state = spiState;
 -                                }
 +                                clientMsgWorkers.remove(clientMsgWrk.clientNodeId(), clientMsgWrk);
 +
 +                                return null;
                              }
 +                        });
  
 -                            if (ignored && log.isDebugEnabled())
 -                                log.debug("Check failed message has been ignored [msg=" + msg +
 -                                    ", spiState=" + state + ']');
 +                        return;
 +                    }
  
 -                            continue;
 +                }
 +            }
 +            else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
 +                final TcpDiscoverySpiState state = spiStateCopy();
 +
 +                if (state == CONNECTED) {
 +                    clientMsgWrk.addReceipt(RES_OK, new CX1<IgniteInternalFuture<?>, Object>() {
 +                        private static final long serialVersionUID = 0L;
 +
 +                        @Override public Object applyx(
 +                            final IgniteInternalFuture<?> fut) throws IgniteCheckedException {
 +                            msgWorker.addMessage(msg);
 +
 +                            clientMsgWrk.markJoinedAndSendPendingMessages();
 +
 +                            return null;
                          }
 -                        else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
 -                            // Send receipt back.
 -                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +                    });
 +                }
 +                else {
 +                    clientMsgWrk.addReceipt(RES_CONTINUE_JOIN, new CX1<IgniteInternalFuture<?>, Object>() {
 +                        private static final long serialVersionUID = 0L;
  
 -                            boolean ignored = false;
 +                        @Override public Object applyx(
 +                            final IgniteInternalFuture<?> fut) throws IgniteCheckedException {
 +                            clientMsgWrk.nonblockingStop();
  
 -                            TcpDiscoverySpiState state = null;
 +                            clientMsgWorkers.remove(clientMsgWrk.clientNodeId(), clientMsgWrk);
  
 -                            synchronized (mux) {
 -                                if (spiState == CONNECTING) {
 -                                    joinRes.set(msg);
 +                            return null;
 +                        }
 +                    });
 +                }
 +            }
 +            else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
 +                // Send receipt back.
 +                clientMsgWrk.addReceipt(RES_OK, createStateChangeClosure(msg, DUPLICATE_ID));
  
 -                                    spiState = LOOPBACK_PROBLEM;
 +                return;
 +            }
 +            else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
 +                // Send receipt back.
 +                clientMsgWrk.addReceipt(RES_OK, createStateChangeClosure(msg, AUTH_FAILED));
  
 -                                    mux.notifyAll();
 -                                }
 -                                else {
 -                                    ignored = true;
 +                return;
 +            }
 +            else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
 +                // Send receipt back.
 +                clientMsgWrk.addReceipt(RES_OK, createStateChangeClosure(msg, CHECK_FAILED));
  
 -                                    state = spiState;
 -                                }
 -                            }
 +                return;
 +            }
 +            else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
 +                // Send receipt back.
 +                clientMsgWrk.addReceipt(RES_OK, createStateChangeClosure(msg, LOOPBACK_PROBLEM));
  
 -                            if (ignored && log.isDebugEnabled())
 -                                log.debug("Loopback problem message has been ignored [msg=" + msg +
 -                                    ", spiState=" + state + ']');
 +                return;
 +            }
 +            else if (msg instanceof TcpDiscoveryPingResponse) {
 +                assert msg.client() : msg;
  
 -                            continue;
 +                final ClientMessageProcessor clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
 +
 +                if (clientWorker != null)
 +                    clientWorker.pingResult(true);
 +
 +                return;
 +            }
 +
 +            TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null;
 +
 +            if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
 +                heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg;
 +            else
 +                msgWorker.addMessage(msg);
 +
 +            final UUID locNodeId = getConfiguredNodeId();
 +
 +            // Send receipt back.
 +            final TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id());
 +
 +            ack.verify(locNodeId);
 +
 +            clientMsgWrk.addMessage(ack, null);
 +
 +            if (heartbeatMsg != null)
 +                clientMsgWrk.metrics(heartbeatMsg.metrics());
 +        }
 +
 +        /**
 +         * @param msg Discovery message.
 +         * @param newState New state.
 +         * @return Closure that changes state to new one.
 +         */
 +        private CX1<IgniteInternalFuture<?>, Object> createStateChangeClosure(final TcpDiscoveryAbstractMessage msg,
 +            final TcpDiscoverySpiState newState) {
 +            return new CX1<IgniteInternalFuture<?>, Object>() {
 +                private static final long serialVersionUID = 0L;
 +
 +                @Override public Object applyx(
 +                    final IgniteInternalFuture<?> fut) throws IgniteCheckedException {
 +                    boolean ignored = false;
 +
 +                    TcpDiscoverySpiState state = null;
 +
 +                    synchronized (mux) {
 +                        if (spiState == CONNECTING) {
 +                            joinRes.set(msg);
 +
 +                            spiState = newState;
 +
 +                            mux.notifyAll();
                          }
 -                        if (msg instanceof TcpDiscoveryPingResponse) {
 -                            assert msg.client() : msg;
 +                        else {
 +                            ignored = true;
  
 -                            ClientMessageWorker clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
 +                            state = spiState;
 +                        }
 +                    }
  
 -                            if (clientWorker != null)
 -                                clientWorker.pingResult(true);
 +                    if (ignored && log.isDebugEnabled())
 +                        log.debug("Duplicate ID message has been ignored [msg=" + msg +
 +                            ", spiState=" + state + ']');
  
 -                            continue;
 +                    return null;
 +                }
 +            };
 +        }
 +
 +        /**
 +         * @param ses Session.
 +         * @return Client node ID.
 +         */
 +        private UUID clientNodeId(final GridNioSession ses) {
 +            return ses.meta(NODE_ID_META);
 +        }
 +    }
 +
 +    /**
 +     * Thread that reads messages from the socket created for incoming connections.
 +     */
 +    private class SocketReader extends IgniteSpiThread {
 +        /** Socket to read data from. */
 +        private final Socket sock;
 +
 +        /** */
 +        private volatile UUID nodeId;
 +
 +        /** Flag indicating that client is processed by NIO server. */
 +        private volatile boolean nioClient;
 +
 +        /**
 +         * Constructor.
 +         *
 +         * @param sock Socket to read data from.
 +         */
 +        SocketReader(Socket sock) {
 +            super(spi.ignite().name(), "tcp-disco-sock-reader", log);
 +
 +            this.sock = sock;
 +
 +            setPriority(spi.threadPri);
 +
 +            spi.stats.onSocketReaderCreated();
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override protected void body() throws InterruptedException {
 +            UUID locNodeId = getConfiguredNodeId();
 +
 +            ClientMessageProcessor clientMsgWrk = null;
 +
 +            try {
 +                InputStream in;
 +
 +                try {
 +                    // Set socket options.
 +                    sock.setKeepAlive(true);
 +                    sock.setTcpNoDelay(spi.getTcpNodelay());
 +
 +                    int timeout = sock.getSoTimeout();
 +
 +                    sock.setSoTimeout((int)spi.netTimeout);
 +
 +                    for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
 +                        connLsnr.apply(sock);
 +
 +                    int rcvBufSize = sock.getReceiveBufferSize();
 +
 +                    in = new BufferedInputStream(sock.getInputStream(), rcvBufSize > 0 ? rcvBufSize : 8192);
 +
 +                    byte[] buf = new byte[4];
 +                    int read = 0;
 +
 +                    while (read < buf.length) {
 +                        int r = in.read(buf, read, buf.length - read);
 +
 +                        if (r >= 0)
 +                            read += r;
 +                        else {
 +                            if (log.isDebugEnabled())
 +                                log.debug("Failed to read magic header (too few bytes received) " +
 +                                    "[rmtAddr=" + sock.getRemoteSocketAddress() +
 +                                    ", locAddr=" + sock.getLocalSocketAddress() + ']');
 +
 +                            LT.warn(log, "Failed to read magic header (too few bytes received) [rmtAddr=" +
 +                                sock.getRemoteSocketAddress() + ", locAddr=" + sock.getLocalSocketAddress() + ']');
 +
 +                            return;
                          }
 +                    }
  
 -                        TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null;
 +                    if (!Arrays.equals(buf, U.IGNITE_HEADER)) {
 +                        if (log.isDebugEnabled())
 +                            log.debug("Unknown connection detected (is some other software connecting to " +
 +                                "this Ignite port?" +
 +                                (!spi.isSslEnabled() ? " missed SSL configuration?" : "" ) +
 +                                ") " +
 +                                "[rmtAddr=" + sock.getRemoteSocketAddress() +
 +                                ", locAddr=" + sock.getLocalSocketAddress() + ']');
  
 -                        if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
 -                            heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg;
 -                        else
 -                            msgWorker.addMessage(msg);
 +                        LT.warn(log, "Unknown connection detected (is some other software connecting to " +
 +                            "this Ignite port?" +
 +                            (!spi.isSslEnabled() ? " missing SSL configuration on remote node?" : "" ) +
 +                            ") [rmtAddr=" + sock.getInetAddress() + ']', true);
  
 -                        // Send receipt back.
 -                        if (clientMsgWrk != null) {
 -                            TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id());
 +                        return;
 +                    }
 +
 +                    // Restore timeout.
 +                    sock.setSoTimeout(timeout);
 +
 +                    TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
 +
 +                    // Ping.
 +                    if (msg instanceof TcpDiscoveryPingRequest) {
 +                        if (!spi.isNodeStopping0()) {
 +                            TcpDiscoveryPingRequest req = (TcpDiscoveryPingRequest)msg;
 +
 +                            TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
 +
 +                            IgniteSpiOperationTimeoutHelper timeoutHelper =
 +                                new IgniteSpiOperationTimeoutHelper(spi);
 +
 +                            if (req.clientNodeId() != null) {
 +                                ClientMessageProcessor clientWorker = clientMsgWorkers.get(req.clientNodeId());
 +
 +                                if (clientWorker != null)
 +                                    res.clientExists(clientWorker.ping(timeoutHelper));
 +                            }
 +
 +                            spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 +                        }
 +                        else if (log.isDebugEnabled())
 +                            log.debug("Ignore ping request, node is stopping.");
 +
 +                        return;
 +                    }
 +
 +                    // Handshake.
 +                    TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
 +
 +                    UUID nodeId = req.creatorNodeId();
 +
 +                    this.nodeId = nodeId;
 +
 +                    TcpDiscoveryHandshakeResponse res =
 +                        new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
 +
 +                    boolean asyncMode = false;
 +
 +                    if (req.client()) {
 +                        res.clientAck(true);
 +
 +                        // If client proposes async mode accept it.
 +                        if (req.asyncMode()) {
 +                            res.asyncMode(true);
 +
 +                            asyncMode = true;
 +                        }
 +                    }
 +
 +                    // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
 +                    // the local node sends a handshake request message on the loopback address, so we get here.
 +                    if (locNodeId.equals(nodeId)) {
 +                        assert !req.client();
 +
 +                        if (log.isDebugEnabled())
 +                            log.debug("Handshake request from local node: " + req);
 +
 +                        return;
 +                    }
 +
 +                    if (req.client()) {
 +                        ClientMessageProcessor clientProc = asyncMode
 +                            ? new ClientNioMessageWorker(nodeId, sock)
 +                            : new ClientMessageWorker(sock, nodeId);
 +
 +                        while (true) {
 +                            ClientMessageProcessor old = clientMsgWorkers.putIfAbsent(nodeId, clientProc);
 +
 +                            if (old == null)
 +                                break;
 +
 +                            if (old instanceof ClientMessageWorker) {
 +                                ClientMessageWorker oldWrk = (ClientMessageWorker)old;
 +
 +                                if (oldWrk.isInterrupted()) {
 +                                    clientMsgWorkers.remove(nodeId, old);
 +
 +                                    continue;
 +                                }
 +
 +                                oldWrk.join(500);
 +
 +                                old = clientMsgWorkers.putIfAbsent(nodeId, clientProc);
 +
 +                                if (old == null)
 +                                    break;
 +
 +                                if (log.isDebugEnabled())
 +                                    log.debug("Already have client message worker, closing connection " +
 +                                        "[locNodeId=" + locNodeId +
 +                                        ", rmtNodeId=" + nodeId +
 +                                        ", workerSock=" + oldWrk.sock +
 +                                        ", sock=" + sock + ']');
 +
 +                                return;
 +                            }
 +                            else if (old instanceof ClientNioMessageWorker) {
 +                                final ClientNioMessageWorker nioOldWrk = (ClientNioMessageWorker)old;
 +
 +                                if (nioOldWrk.state() == WorkerState.STOPPED)
 +                                    clientMsgWorkers.remove(nodeId, nioOldWrk);
 +                                else {
 +                                    // check if old worker is stopping
 +                                    for (int i = 0; i < 5; i++) {
 +                                        U.sleep(100);
 +
 +                                        if (nioOldWrk.state() == WorkerState.STOPPED)
 +                                            break;
 +                                    }
 +
 +                                    old = clientMsgWorkers.putIfAbsent(nodeId, clientProc);
 +
 +                                    if (old == null)
 +                                        break;
 +
 +                                    if (log.isDebugEnabled())
 +                                        log.debug("Already have client message worker, closing connection " +
 +                                            "[locNodeId=" + locNodeId +
 +                                            ", rmtNodeId=" + nodeId +
 +                                            ", workerSock=" + nioOldWrk.sock +
 +                                            ", sock=" + sock + ']');
 +
 +                                    return;
 +                                }
 +                            }
 +                        }
 +
 +                        if (log.isDebugEnabled())
 +                            log.debug("Created client message worker [locNodeId=" + locNodeId +
 +                                ", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
 +
 +                        assert clientProc == clientMsgWorkers.get(nodeId);
 +
 +                        clientMsgWrk = clientProc;
 +
 +                        nioClient = clientMsgWrk instanceof ClientNioMessageWorker;
 +                    }
 +
 +                    if (nioClient) {
 +                        final ClientNioMessageWorker nioWrk = (ClientNioMessageWorker)clientMsgWrk;
 +
 +                        nioWrk.start();
 +
 +                        nioWrk.sendMessage(res, null);
 +
 +                        return;
 +                    }
 +
 +                    spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ?
 +                        spi.failureDetectionTimeout() : spi.getSocketTimeout());
 +
 +                    if (log.isDebugEnabled())
 +                        log.debug("Initialized connection with remote node [nodeId=" + nodeId +
 +                            ", client=" + req.client() + ']');
 +
 +                    if (debugMode) {
 +                        debugLog(msg, "Initialized connection with remote node [nodeId=" + nodeId +
 +                            ", client=" + req.client() + ']');
 +                    }
 +                }
 +                catch (IOException e) {
 +                    if (log.isDebugEnabled())
 +                        U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 +
 +                    if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0())
 +                        LT.warn(log, "Failed to initialize connection " +
 +                            "(missing SSL configuration on remote node?) " +
 +                            "[rmtAddr=" + sock.getInetAddress() + ']', true);
 +                    else if ((X.hasCause(e, ObjectStreamException.class) || !sock.isClosed())
 +                        && !spi.isNodeStopping0()) {
 +                        if (U.isMacInvalidArgumentError(e))
 +                            LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" +
 +                                U.MAC_INVALID_ARG_MSG);
 +                        else {
 +                            U.error(
 +                                log,
 +                                "Failed to initialize connection (this can happen due to short time " +
 +                                    "network problems and can be ignored if does not affect node discovery) " +
 +                                    "[sock=" + sock + ']',
 +                                e);
 +                        }
 +                    }
 +
 +                    onException("Caught exception on handshake [err=" + e + ", sock=" + sock + ']', e);
 +
 +                    return;
 +                }
 +                catch (IgniteCheckedException e) {
 +                    if (log.isDebugEnabled())
 +                        U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 +
 +                    onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 +
 +                    if (e.hasCause(SocketTimeoutException.class))
 +                        LT.warn(log, "Socket operation timed out on handshake " +
 +                            "(consider increasing 'networkTimeout' configuration property) " +
 +                            "[netTimeout=" + spi.netTimeout + ']');
 +
 +                    else if (e.hasCause(ClassNotFoundException.class))
 +                        LT.warn(log, "Failed to read message due to ClassNotFoundException " +
 +                            "(make sure same versions of all classes are available on all nodes) " +
 +                            "[rmtAddr=" + sock.getRemoteSocketAddress() +
 +                            ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
 +
 +                        // Always report marshalling problems.
 +                    else if (e.hasCause(ObjectStreamException.class) ||
 +                        (!sock.isClosed() && !e.hasCause(IOException.class)))
 +                        LT.error(log, e, "Failed to initialize connection [sock=" + sock + ']');
 +
 +                    return;
 +                }
 +
 +                long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
 +                    spi.getSocketTimeout();
 +
 +                final ClientMessageWorker clientMsgWrk0 = clientMsgWrk == null ? null
 +                    : (ClientMessageWorker)clientMsgWrk;
 +
 +                while (!isInterrupted()) {
 +                    try {
 +                        TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
 +                            U.resolveClassLoader(spi.ignite().configuration()));
 +
 +                        msg.senderNodeId(nodeId);
 +
-                         if (log.isDebugEnabled())
-                             log.debug("Message has been received: " + msg);
++                        DebugLogger debugLog = messageLogger(msg);
++
++                        if (debugLog.isDebugEnabled())
++                            debugLog.debug("Message has been received: " + msg);
 +
 +                        spi.stats.onMessageReceived(msg);
 +
 +                        if (debugMode && recordable(msg))
 +                            debugLog(msg, "Message has been received: " + msg);
 +
 +                        if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
 +                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +
 +                            continue;
 +                        }
 +                        else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
 +                            TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
 +
 +                            if (!req.responded()) {
 +                                boolean ok = processJoinRequestMessage(req, clientMsgWrk0);
 +
 +                                if (clientMsgWrk0 != null && ok)
 +                                    continue;
 +                                else
 +                                    // Direct join request - no need to handle this socket anymore.
 +                                    break;
 +                            }
 +                        }
 +                        else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
 +                            if (clientMsgWrk0 != null) {
 +                                TcpDiscoverySpiState state = spiStateCopy();
 +
 +                                if (state == CONNECTED) {
 +                                    spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +
 +                                    if (clientMsgWrk0.getState() == State.NEW)
 +                                        clientMsgWrk0.start();
 +
 +                                    msgWorker.addMessage(msg);
 +
 +                                    continue;
 +                                }
 +                                else {
 +                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
 +
 +                                    break;
 +                                }
 +                            }
 +                        }
 +                        else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
 +                            // Send receipt back.
 +                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +
 +                            boolean ignored = false;
 +
 +                            TcpDiscoverySpiState state = null;
 +
 +                            synchronized (mux) {
 +                                if (spiState == CONNECTING) {
 +                                    joinRes.set(msg);
 +
 +                                    spiState = DUPLICATE_ID;
 +
 +                                    mux.notifyAll();
 +                                }
 +                                else {
 +                                    ignored = true;
 +
 +                                    state = spiState;
 +                                }
 +                            }
 +
 +                            if (ignored && log.isDebugEnabled())
 +                                log.debug("Duplicate ID message has been ignored [msg=" + msg +
 +                                    ", spiState=" + state + ']');
 +
 +                            continue;
 +                        }
 +                        else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
 +                            // Send receipt back.
 +                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +
 +                            boolean ignored = false;
 +
 +                            TcpDiscoverySpiState state = null;
 +
 +                            synchronized (mux) {
 +                                if (spiState == CONNECTING) {
 +                                    joinRes.set(msg);
 +
 +                                    spiState = AUTH_FAILED;
 +
 +                                    mux.notifyAll();
 +                                }
 +                                else {
 +                                    ignored = true;
 +
 +                                    state = spiState;
 +                                }
 +                            }
 +
 +                            if (ignored && log.isDebugEnabled())
 +                                log.debug("Auth failed message has been ignored [msg=" + msg +
 +                                    ", spiState=" + state + ']');
 +
 +                            continue;
 +                        }
 +                        else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
 +                            // Send receipt back.
 +                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +
 +                            boolean ignored = false;
 +
 +                            TcpDiscoverySpiState state = null;
 +
 +                            synchronized (mux) {
 +                                if (spiState == CONNECTING) {
 +                                    joinRes.set(msg);
 +
 +                                    spiState = CHECK_FAILED;
 +
 +                                    mux.notifyAll();
 +                                }
 +                                else {
 +                                    ignored = true;
 +
 +                                    state = spiState;
 +                                }
 +                            }
 +
 +                            if (ignored && log.isDebugEnabled())
 +                                log.debug("Check failed message has been ignored [msg=" + msg +
 +                                    ", spiState=" + state + ']');
 +
 +                            continue;
 +                        }
 +                        else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
 +                            // Send receipt back.
 +                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +
 +                            boolean ignored = false;
 +
 +                            TcpDiscoverySpiState state = null;
 +
 +                            synchronized (mux) {
 +                                if (spiState == CONNECTING) {
 +                                    joinRes.set(msg);
 +
 +                                    spiState = LOOPBACK_PROBLEM;
 +
 +                                    mux.notifyAll();
 +                                }
 +                                else {
 +                                    ignored = true;
 +
 +                                    state = spiState;
 +                                }
 +                            }
 +
 +                            if (ignored && log.isDebugEnabled())
 +                                log.debug("Loopback problem message has been ignored [msg=" + msg +
 +                                    ", spiState=" + state + ']');
 +
 +                            continue;
 +                        }
 +                        if (msg instanceof TcpDiscoveryPingResponse) {
 +                            assert msg.client() : msg;
 +
 +                            ClientMessageProcessor clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
 +
 +                            if (clientWorker != null)
 +                                clientWorker.pingResult(true);
 +
 +                            continue;
 +                        }
 +
 +                        TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null;
 +
 +                        if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
 +                            heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg;
 +                        else
 +                            msgWorker.addMessage(msg);
 +
 +                        // Send receipt back.
 +                        if (clientMsgWrk != null) {
 +                            TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id());
 +
 +                            ack.verify(locNodeId);
 +
 +                            clientMsgWrk.addMessage(ack);
 +                        }
 +                        else
 +                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +
 +                        if (heartbeatMsg != null)
 +                            processClientHeartbeatMessage(heartbeatMsg);
 +                    }
 +                    catch (IgniteCheckedException e) {
 +                        if (log.isDebugEnabled())
 +                            U.error(log, "Caught exception on message read [sock=" + sock +
 +                                ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e);
 +
 +                        onException("Caught exception on message read [sock=" + sock +
 +                            ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e);
 +
 +                        if (isInterrupted() || sock.isClosed())
 +                            return;
 +
 +                        if (e.hasCause(ClassNotFoundException.class))
 +                            LT.warn(log, "Failed to read message due to ClassNotFoundException " +
 +                                "(make sure same versions of all classes are available on all nodes) " +
 +                                "[rmtNodeId=" + nodeId +
 +                                ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
 +
 +                        // Always report marshalling errors.
 +                        boolean err = e.hasCause(ObjectStreamException.class) ||
 +                            (nodeAlive(nodeId) && spiStateCopy() == CONNECTED && !X.hasCause(e, IOException.class));
 +
 +                        if (err)
 +                            LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" + locNodeId +
 +                                ", rmtNodeId=" + nodeId + ']');
 +
 +                        return;
 +                    }
 +                    catch (IOException e) {
 +                        if (log.isDebugEnabled())
 +                            U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId +
 +                                ", rmtNodeId=" + nodeId + ']', e);
 +
 +                        if (isInterrupted() || sock.isClosed())
 +                            return;
 +
 +                        // Always report marshalling errors (although it is strange here).
 +                        boolean err = X.hasCause(e, ObjectStreamException.class) ||
 +                            (nodeAlive(nodeId) && spiStateCopy() == CONNECTED);
 +
 +                        if (err)
 +                            LT.error(log, e, "Failed to send receipt on message [sock=" + sock +
 +                                ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']');
 +
 +                        onException("Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId +
 +                            ", rmtNodeId=" + nodeId + ']', e);
 +
 +                        return;
 +                    }
 +                }
 +            }
 +            finally {
 +                if (clientMsgWrk != null) {
 +                    if (log.isDebugEnabled())
 +                        log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId +
 +                            ", rmtNodeId=" + nodeId + ']');
 +
 +                    if (!nioClient) {
 +                        clientMsgWorkers.remove(nodeId, clientMsgWrk);
 +
 +                        U.interrupt((ClientMessageWorker)clientMsgWrk);
 +                    }
 +                }
 +
 +                if (!nioClient)
 +                    U.closeQuiet(sock);
 +            }
 +        }
 +
 +        /**
 +         * Processes client heartbeat message.
 +         *
 +         * @param msg Heartbeat message.
 +         */
 +        private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg) {
 +            assert msg.client();
 +
 +            ClientMessageProcessor wrk = clientMsgWorkers.get(msg.creatorNodeId());
 +
 +            if (wrk != null)
 +                wrk.metrics(msg.metrics());
 +            else if (log.isDebugEnabled())
 +                log.debug("Received heartbeat message from unknown client node: " + msg);
 +        }
 +
 +        /**
 +         * @param msg Join request message.
 +         * @param clientMsgWrk Client message worker to start.
 +         * @return Whether connection was successful.
 +         * @throws IOException If IO failed.
 +         */
 +        @SuppressWarnings({"IfMayBeConditional"})
 +        private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg,
 +            @Nullable ClientMessageWorker clientMsgWrk) throws IOException, IgniteCheckedException {
 +            assert msg != null;
 +            assert !msg.responded();
 +
 +            TcpDiscoverySpiState state = spiStateCopy();
 +
 +            long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
 +                spi.getSocketTimeout();
 +
 +            if (state == CONNECTED) {
 +                spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +
 +                if (log.isDebugEnabled())
 +                    log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
 +
 +                msg.responded(true);
 +
 +                if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) {
 +                    clientMsgWrk.clientVersion(U.productVersion(msg.node()));
 +
 +                    clientMsgWrk.start();
 +                }
 +
 +                msgWorker.addMessage(msg);
 +
 +                return true;
 +            }
 +            else {
 +                spi.stats.onMessageProcessingStarted(msg);
 +
 +                Integer res;
 +
 +                SocketAddress rmtAddr = sock.getRemoteSocketAddress();
 +
 +                if (state == CONNECTING) {
 +                    if (noResAddrs.contains(rmtAddr) ||
 +                        getLocalNodeId().compareTo(msg.creatorNodeId()) < 0)
 +                        // Remote node node has not responded to join request or loses UUID race.
 +                        res = RES_WAIT;
 +                    else
 +                        // Remote node responded to join request and wins UUID race.
 +                        res = RES_CONTINUE_JOIN;
 +                }
 +                else
 +                    // Local node is stopping. Remote node should try next one.
 +                    res = RES_CONTINUE_JOIN;
 +
 +                spi.writeToSocket(msg, sock, res, sockTimeout);
 +
 +                if (log.isDebugEnabled())
 +                    log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
 +
 +                fromAddrs.addAll(msg.node().socketAddresses());
 +
 +                spi.stats.onMessageProcessingFinished(msg);
 +
 +                return false;
 +            }
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void interrupt() {
 +            super.interrupt();
 +
 +            U.closeQuiet(sock);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override protected void cleanup() {
 +            super.cleanup();
 +
 +            if (!nioClient)
 +                U.closeQuiet(sock);
 +
 +            synchronized (mux) {
 +                readers.remove(this);
 +            }
 +
 +            spi.stats.onSocketReaderRemoved();
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return "Socket reader [id=" + getId() + ", name=" + getName() + ", nodeId=" + nodeId + ']';
 +        }
 +    }
 +
 +    /**
 +     * SPI Statistics printer.
 +     */
 +    private class StatisticsPrinter extends IgniteSpiThread {
 +        /**
 +         * Constructor.
 +         */
 +        StatisticsPrinter() {
 +            super(spi.ignite().name(), "tcp-disco-stats-printer", log);
 +
 +            assert spi.statsPrintFreq > 0;
 +
 +            assert log.isInfoEnabled();
 +
 +            setPriority(spi.threadPri);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @SuppressWarnings({"BusyWait"})
 +        @Override protected void body() throws InterruptedException {
 +            if (log.isDebugEnabled())
 +                log.debug("Statistics printer has been started.");
 +
 +            while (!isInterrupted()) {
 +                Thread.sleep(spi.statsPrintFreq);
 +
 +                printStatistics();
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Provides communication with client node.
 +     */
 +    private interface ClientMessageProcessor {
 +        /**
 +         * @param msg Message.
 +         */
 +        void addMessage(TcpDiscoveryAbstractMessage msg);
 +
 +        /**
 +         * @param msg Message.
 +         * @param msgBytes Marshalled message.
 +         */
 +        void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes);
 +
 +        /**
 +         * @param timeoutHelper Timeout helper.
 +         * @return Success flag.
 +         * @throws InterruptedException
 +         */
 +        boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException;
 +
 +        /**
 +         * @param res Ping result.
 +         */
 +        void pingResult(boolean res);
 +
 +        /**
 +         * @return Client ID.
 +         */
 +        UUID clientNodeId();
 +
 +        /**
 +         * @return Cluster metrics.
 +         */
 +        ClusterMetrics metrics();
 +
 +        /**
 +         * @param metrics Cluster metrics.
 +         */
 +        void metrics(ClusterMetrics metrics);
 +    }
 +
 +    /**
 +     *
 +     */
 +    private class ClientMessageWorker extends MessageWorkerAdapter<T2<TcpDiscoveryAbstractMessage, byte[]>>
 +        implements ClientMessageProcessor {
 +        /** Node ID. */
 +        private final UUID clientNodeId;
 +
 +        /** Socket. */
 +        private final Socket sock;
 +
 +        /** Current client metrics. */
 +        private volatile ClusterMetrics metrics;
 +
 +        /** */
 +        private IgniteProductVersion clientVer;
 +
 +        /** */
 +        private volatile ClientMessagePinger pinger;
 +
 +        /**
 +         * @param sock Socket.
 +         * @param clientNodeId Node ID.
 +         */
 +        protected ClientMessageWorker(Socket sock, UUID clientNodeId) throws IOException {
 +            super("tcp-disco-client-message-worker", 2000);
 +
 +            this.sock = sock;
 +            this.clientNodeId = clientNodeId;
 +        }
 +
 +        /**
 +         * @param clientVer Client version.
 +         */
 +        void clientVersion(IgniteProductVersion clientVer) {
 +            this.clientVer = clientVer;
 +        }
 +
 +        /**
 +         * @return Current client metrics.
 +         */
 +        @Override public ClusterMetrics metrics() {
 +            return metrics;
 +        }
 +
 +        /**
 +         * @param metrics New current client metrics.
 +         */
 +        @Override public void metrics(ClusterMetrics metrics) {
 +            this.metrics = metrics;
 +        }
 +
 +        /**
 +         * @param msg Message.
 +         */
 +        @Override public void addMessage(TcpDiscoveryAbstractMessage msg) {
 +            addMessage(msg, null);
 +        }
 +
 +        /**
 +         * @param msg Message.
 +         * @param msgBytes Optional message bytes.
 +         */
 +        @Override public void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) {
 +            T2 t = new T2<>(msg, msgBytes);
 +
 +            if (msg.highPriority())
 +                queue.addFirst(t);
 +            else
 +                queue.add(t);
 +
++            DebugLogger log = messageLogger(msg);
++
 +            if (log.isDebugEnabled())
 +                log.debug("Message has been added to client queue: " + msg);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public UUID clientNodeId() {
 +            return clientNodeId;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override protected void processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) {
 +            boolean success = false;
 +
 +            TcpDiscoveryAbstractMessage msg = msgT.get1();
 +
 +            try {
 +                assert msg.verified() : msg;
 +
 +                byte[] msgBytes = msgT.get2();
 +
 +                if (msgBytes == null)
 +                    msgBytes = U.marshal(spi.marshaller(), msg);
 +
++                DebugLogger msgLog = messageLogger(msg);
++
 +                if (msg instanceof TcpDiscoveryClientAckResponse) {
 +                    if (clientVer == null) {
 +                        ClusterNode node = spi.getNode(clientNodeId);
 +
 +                        if (node != null)
 +                            clientVer = IgniteUtils.productVersion(node);
-                         else if (log.isDebugEnabled())
-                             log.debug("Skip sending message ack to client, fail to get client node " +
++                        else if (msgLog.isDebugEnabled())
++                            msgLog.debug("Skip sending message ack to client, fail to get client node " +
 +                                "[sock=" + sock + ", locNodeId=" + getLocalNodeId() +
 +                                ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 +                    }
 +
 +                    if (clientVer != null &&
 +                        clientVer.compareTo(TcpDiscoveryClientAckResponse.CLIENT_ACK_SINCE_VERSION) >= 0) {
-                         if (log.isDebugEnabled())
-                             log.debug("Sending message ack to client [sock=" + sock + ", locNodeId="
++                        if (msgLog.isDebugEnabled())
++                            msgLog.debug("Sending message ack to client [sock=" + sock + ", locNodeId="
 +                                + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 +
 +                        spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
 +                            spi.failureDetectionTimeout() : spi.getSocketTimeout());
 +                    }
 +                }
 +                else {
-                     if (log.isDebugEnabled())
-                         log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
++                    if (msgLog.isDebugEnabled())
++                        msgLog.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
 +                            + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 +
 +                    assert topologyInitialized(msg) : msg;
 +
 +                    spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
 +                        spi.failureDetectionTimeout() : spi.getSocketTimeout());
 +                }
 +
 +                success = true;
 +            }
 +            catch (IgniteCheckedException | IOException e) {
 +                if (log.isDebugEnabled())
 +                    U.error(log, "Client connection failed [sock=" + sock + ", locNodeId="
 +                        + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']', e);
 +
 +                onException("Client connection failed [sock=" + sock + ", locNodeId="
 +                    + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']', e);
 +            }
 +            finally {
 +                if (!success) {
 +                    clientMsgWorkers.remove(clientNodeId, this);
 +
 +                    U.interrupt(this);
 +
 +                    U.closeQuiet(sock);
 +                }
 +            }
 +        }
 +
 +        /**
 +         * @param msg Message.
 +         * @return {@code True} if topology initialized.
 +         */
 +        private boolean topologyInitialized(TcpDiscoveryAbstractMessage msg) {
 +            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
 +                TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
 +
 +                if (clientNodeId.equals(addedMsg.node().id()))
 +                    return addedMsg.topology() != null;
 +            }
 +
 +            return true;
 +        }
 +
 +        /**
 +         * @param res Ping result.
 +         */
 +        @Override public void pingResult(boolean res) {
 +            pinger().pingResult(res);
 +        }
 +
 +        /**
 +         * @param timeoutHelper Timeout controller.
 +         * @return Ping result.
 +         * @throws InterruptedException If interrupted.
 +         */
 +        @Override public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException {
 +            return pinger().ping(timeoutHelper);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override protected void cleanup() {
 +            super.cleanup();
 +
 +            pingResult(false);
 +
 +            U.closeQuiet(sock);
 +        }
 +
 +        /**
 +         * @return Pinger.
 +         */
 +        private ClientMessagePinger pinger() {
 +            if (pinger == null) {
 +                synchronized (this) {
 +                    if (pinger == null)
 +                        pinger = new ClientMessagePinger(this);
 +                }
 +            }
 +
 +            return pinger;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(ClientMessageWorker.class, this);
 +        }
 +    }
 +
 +    /**
 +     * Base class for message workers.
 +     */
 +    protected abstract class MessageWorkerAdapter<T> extends IgniteSpiThread {
 +        /** Message queue. */
 +        protected final BlockingDeque<T> queue = new LinkedBlockingDeque<>();
 +
 +        /** Backed interrupted flag. */
 +        private volatile boolean interrupted;
 +
 +        /** Polling timeout. */
 +        private final long pollingTimeout;
 +
 +        /**
 +         * @param name Thread name.
 +         * @param pollingTimeout Messages polling timeout.
 +         */
 +        MessageWorkerAdapter(String name, long pollingTimeout) {
 +            super(spi.ignite().name(), name, log);
 +
 +            this.pollingTimeout = pollingTimeout;
 +
 +            setPriority(spi.threadPri);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override protected void body() throws InterruptedException {
 +            if (log.isDebugEnabled())
 +                log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']');
 +
 +            while (!isInterrupted()) {
 +                T msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
 +
 +                if (msg == null)
 +                    noMessageLoop();
 +                else
 +                    processMessage(msg);
 +            }
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void interrupt() {
 +            interrupted = true;
 +
 +            super.interrupt();
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public boolean isInterrupted() {
 +            return interrupted || super.isInterrupted();
 +        }
 +
 +        /**
 +         * @return Current queue size.
 +         */
 +        int queueSize() {
 +            return queue.size();
 +        }
 +
 +        /**
 +         * @param msg Message.
 +         */
 +        protected abstract void processMessage(T msg);
 +
 +        /**
 +         * Called when there is no message to process giving ability to perform other activity.
 +         */
 +        protected void noMessageLoop() {
 +            // No-op.
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private static class GridPingFutureAdapter<R> extends GridFutureAdapter<R> {
 +        /** */
 +        private static final long serialVersionUID = 0L;
 +
 +        /** Socket. */
 +        private volatile Socket sock;
 +
 +        /**
 +         * Returns socket associated with this ping future.
 +         *
 +         * @return Socket or {@code null} if no socket associated.
 +         */
 +        public Socket sock() {
 +            return sock;
 +        }
 +
 +        /**
 +         * Associates socket with this ping future.
 +         *
 +         * @param sock Socket.
 +         */
 +        public void sock(Socket sock) {
 +            this.sock = sock;
 +        }
 +    }
  
 -                            ack.verify(locNodeId);
 +    /**
 +     * Socket decorator that overrides {@link Socket#getInputStream()}
 +     * and {@link Socket#getOutputStream()} that return custom implementations which
 +     * use passed {@link SSLEngine}.
 +     */
 +    static class NioSslSocket extends Socket {
 +        /** Genuine socket. */
 +        private final Socket delegate;
  
 -                            clientMsgWrk.addMessage(ack);
 -                        }
 -                        else
 -                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 +        /** */
 +        private final SocketChannel ch;
  
 -                        if (heartbeatMsg != null)
 -  

<TRUNCATED>
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f388e3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f388e3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------