You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/27 15:19:36 UTC

[4/5] incubator-ignite git commit: # IGNITE-943 Merge TcpDiscoverySpi and TcpClientDiscoverySpi

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
new file mode 100644
index 0000000..7e1f592
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -0,0 +1,4792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.events.*;
+import org.apache.ignite.internal.processors.security.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.security.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.net.*;
+import java.text.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
+import static org.apache.ignite.spi.IgnitePortProtocol.*;
+import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
+import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*;
+
+/**
+ *
+ */
+class ServerImpl extends TcpDiscoveryImpl {
+    /** */
+    private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>());
+
+    /** Nodes ring. */
+    @GridToStringExclude
+    private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
+
+    /** Topology snapshots history. */
+    private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+
+    /** Socket readers. */
+    private final Collection<SocketReader> readers = new LinkedList<>();
+
+    /** TCP server for discovery SPI. */
+    private TcpServer tcpSrvr;
+
+    /** Message worker. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private RingMessageWorker msgWorker;
+
+    /** Client message workers. */
+    private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
+
+    /** Metrics sender. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private HeartbeatsSender hbsSnd;
+
+    /** Status checker. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private CheckStatusSender chkStatusSnd;
+
+    /** IP finder cleaner. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private IpFinderCleaner ipFinderCleaner;
+
+    /** Statistics printer thread. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private StatisticsPrinter statsPrinter;
+
+    /** Failed nodes (but still in topology). */
+    private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
+
+    /** Leaving nodes (but still in topology). */
+    private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
+
+    /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
+    private boolean ipFinderHasLocAddr;
+
+    /** Addresses that do not respond during join requests send (for resolving concurrent start). */
+    private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>();
+
+    /** Addresses that incoming join requests send were send from (for resolving concurrent start). */
+    private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>();
+
+    /** Response on join request from coordinator (in case of duplicate ID or auth failure). */
+    private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1();
+
+    /** Node authenticator. */
+    private DiscoverySpiNodeAuthenticator nodeAuth;
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Discovery state. */
+    protected TcpDiscoverySpiState spiState = DISCONNECTED;
+
+    /** Map with proceeding ping requests. */
+    private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
+        new ConcurrentHashMap8<>();
+
+    /** Debug mode. */
+    private boolean debugMode;
+
+    /** Debug messages history. */
+    private int debugMsgHist = 512;
+
+    /** Received messages. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private ConcurrentLinkedDeque<String> debugLog;
+
+    /**
+     * @param adapter Adapter.
+     */
+    ServerImpl(TcpDiscoverySpi adapter) {
+        super(adapter);
+    }
+
+    /**
+     * This method is intended for troubleshooting purposes only.
+     *
+     * @param debugMode {code True} to start SPI in debug mode.
+     */
+    public void setDebugMode(boolean debugMode) {
+        this.debugMode = debugMode;
+    }
+
+    /**
+     * This method is intended for troubleshooting purposes only.
+     *
+     * @param debugMsgHist Message history log size.
+     */
+    public void setDebugMessageHistory(int debugMsgHist) {
+        this.debugMsgHist = debugMsgHist;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getSpiState() {
+        synchronized (mux) {
+            return spiState.name();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMessageWorkerQueueSize() {
+        return msgWorker.queueSize();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public UUID getCoordinator() {
+        TcpDiscoveryNode crd = resolveCoordinator();
+
+        return crd != null ? crd.id() : null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+        assert nodeId != null;
+
+        UUID locNodeId0 = getLocalNodeId();
+
+        if (locNodeId0 != null && locNodeId0.equals(nodeId))
+            // Return local node directly.
+            return locNode;
+
+        TcpDiscoveryNode node = ring.node(nodeId);
+
+        if (node != null && !node.visible())
+            return null;
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> getRemoteNodes() {
+        return F.upcast(ring.visibleRemoteNodes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStart(String gridName) throws IgniteSpiException {
+        synchronized (mux) {
+            spiState = DISCONNECTED;
+        }
+
+        if (debugMode) {
+            if (!log.isInfoEnabled())
+                throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
+                    "in debug mode.");
+
+            debugLog = new ConcurrentLinkedDeque<>();
+
+            U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode.");
+        }
+
+        // Clear addresses collections.
+        fromAddrs.clear();
+        noResAddrs.clear();
+
+        msgWorker = new RingMessageWorker();
+        msgWorker.start();
+
+        tcpSrvr = new TcpServer();
+
+        adapter.initLocalNode(tcpSrvr.port, true);
+
+        locNode = adapter.locNode;
+
+        // Start TCP server thread after local node is initialized.
+        tcpSrvr.start();
+
+        ring.localNode(locNode);
+
+        if (adapter.ipFinder.isShared())
+            registerLocalNodeAddress();
+        else {
+            if (F.isEmpty(adapter.ipFinder.getRegisteredAddresses()))
+                throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " +
+                    "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " +
+                    "(specify list of IP addresses in configuration).");
+
+            ipFinderHasLocAddr = adapter.ipFinderHasLocalAddress();
+        }
+
+        if (adapter.getStatisticsPrintFrequency() > 0 && log.isInfoEnabled()) {
+            statsPrinter = new StatisticsPrinter();
+            statsPrinter.start();
+        }
+
+        adapter.stats.onJoinStarted();
+
+        joinTopology();
+
+        adapter.stats.onJoinFinished();
+
+        hbsSnd = new HeartbeatsSender();
+        hbsSnd.start();
+
+        chkStatusSnd = new CheckStatusSender();
+        chkStatusSnd.start();
+
+        if (adapter.ipFinder.isShared()) {
+            ipFinderCleaner = new IpFinderCleaner();
+            ipFinderCleaner.start();
+        }
+
+        adapter.printStartInfo();
+    }
+
+    /**
+     * @throws IgniteSpiException If failed.
+     */
+    @SuppressWarnings("BusyWait")
+    private void registerLocalNodeAddress() throws IgniteSpiException {
+        // Make sure address registration succeeded.
+        while (true) {
+            try {
+                adapter.ipFinder.initializeLocalAddresses(locNode.socketAddresses());
+
+                // Success.
+                break;
+            }
+            catch (IllegalStateException e) {
+                throw new IgniteSpiException("Failed to register local node address with IP finder: " +
+                    locNode.socketAddresses(), e);
+            }
+            catch (IgniteSpiException e) {
+                LT.error(log, e, "Failed to register local node address in IP finder on start " +
+                    "(retrying every 2000 ms).");
+            }
+
+            try {
+                U.sleep(2000);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new IgniteSpiException("Thread has been interrupted.", e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+        spiCtx.registerPort(tcpSrvr.port, TCP);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStop() throws IgniteSpiException {
+        spiStop0(false);
+    }
+
+    /**
+     * Stops SPI finally or stops SPI for restart.
+     *
+     * @param disconnect {@code True} if SPI is being disconnected.
+     * @throws IgniteSpiException If failed.
+     */
+    private void spiStop0(boolean disconnect) throws IgniteSpiException {
+        if (log.isDebugEnabled()) {
+            if (disconnect)
+                log.debug("Disconnecting SPI.");
+            else
+                log.debug("Preparing to start local node stop procedure.");
+        }
+
+        if (disconnect) {
+            synchronized (mux) {
+                spiState = DISCONNECTING;
+            }
+        }
+
+        if (msgWorker != null && msgWorker.isAlive() && !disconnect) {
+            // Send node left message only if it is final stop.
+            msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
+
+            synchronized (mux) {
+                long threshold = U.currentTimeMillis() + adapter.netTimeout;
+
+                long timeout = adapter.netTimeout;
+
+                while (spiState != LEFT && timeout > 0) {
+                    try {
+                        mux.wait(timeout);
+
+                        timeout = threshold - U.currentTimeMillis();
+                    }
+                    catch (InterruptedException ignored) {
+                        Thread.currentThread().interrupt();
+
+                        break;
+                    }
+                }
+
+                if (spiState == LEFT) {
+                    if (log.isDebugEnabled())
+                        log.debug("Verification for local node leave has been received from coordinator" +
+                            " (continuing stop procedure).");
+                }
+                else if (log.isInfoEnabled()) {
+                    log.info("No verification for local node leave has been received from coordinator" +
+                        " (will stop node anyway).");
+                }
+            }
+        }
+
+        U.interrupt(tcpSrvr);
+        U.join(tcpSrvr, log);
+
+        Collection<SocketReader> tmp;
+
+        synchronized (mux) {
+            tmp = U.arrayList(readers);
+        }
+
+        U.interrupt(tmp);
+        U.joinThreads(tmp, log);
+
+        U.interrupt(hbsSnd);
+        U.join(hbsSnd, log);
+
+        U.interrupt(chkStatusSnd);
+        U.join(chkStatusSnd, log);
+
+        U.interrupt(ipFinderCleaner);
+        U.join(ipFinderCleaner, log);
+
+        U.interrupt(msgWorker);
+        U.join(msgWorker, log);
+
+        U.interrupt(statsPrinter);
+        U.join(statsPrinter, log);
+
+        Collection<TcpDiscoveryNode> rmts = null;
+
+        if (!disconnect)
+            adapter.printStopInfo();
+        else {
+            adapter.getSpiContext().deregisterPorts();
+
+            rmts = ring.visibleRemoteNodes();
+        }
+
+        long topVer = ring.topologyVersion();
+
+        ring.clear();
+
+        if (rmts != null && !rmts.isEmpty()) {
+            // This is restart/disconnection and remote nodes are not empty.
+            // We need to fire FAIL event for each.
+            DiscoverySpiListener lsnr = adapter.lsnr;
+
+            if (lsnr != null) {
+                Set<ClusterNode> processed = new HashSet<>();
+
+                for (TcpDiscoveryNode n : rmts) {
+                    assert n.visible();
+
+                    processed.add(n);
+
+                    List<ClusterNode> top = U.arrayList(rmts, F.notIn(processed));
+
+                    topVer++;
+
+                    Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer,
+                        Collections.unmodifiableList(top));
+
+                    lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null);
+                }
+            }
+        }
+
+        printStatistics();
+
+        adapter.stats.clear();
+
+        synchronized (mux) {
+            // Clear stored data.
+            leavingNodes.clear();
+            failedNodes.clear();
+
+            spiState = DISCONNECTED;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        assert nodeId != null;
+
+        if (log.isDebugEnabled())
+            log.debug("Pinging node: " + nodeId + "].");
+
+        if (nodeId == getLocalNodeId())
+            return true;
+
+        TcpDiscoveryNode node = ring.node(nodeId);
+
+        if (node == null || !node.visible())
+            return false;
+
+        boolean res = pingNode(node);
+
+        if (!res && !node.isClient()) {
+            LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId);
+
+            msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id()));
+        }
+
+        return res;
+    }
+
+    /**
+     * Pings the remote node to see if it's alive.
+     *
+     * @param node Node.
+     * @return {@code True} if ping succeeds.
+     */
+    private boolean pingNode(TcpDiscoveryNode node) {
+        assert node != null;
+
+        if (node.id().equals(getLocalNodeId()))
+            return true;
+
+        UUID clientNodeId = null;
+
+        if (node.isClient()) {
+            clientNodeId = node.id();
+
+            node = ring.node(node.clientRouterNodeId());
+
+            if (node == null || !node.visible())
+                return false;
+        }
+
+        for (InetSocketAddress addr : adapter.getNodeAddresses(node, U.sameMacs(locNode, node))) {
+            try {
+                // ID returned by the node should be the same as ID of the parameter for ping to succeed.
+                IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
+
+                return node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']');
+
+                onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e);
+                // continue;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Pings the node by its address to see if it's alive.
+     *
+     * @param addr Address of the node.
+     * @return ID of the remote node and "client exists" flag if node alive.
+     * @throws IgniteSpiException If an error occurs.
+     */
+    private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId)
+        throws IgniteCheckedException {
+        assert addr != null;
+
+        UUID locNodeId = getLocalNodeId();
+
+        if (F.contains(adapter.locNodeAddrs, addr)) {
+            if (clientNodeId == null)
+                return F.t(getLocalNodeId(), false);
+
+            ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId);
+
+            if (clientWorker == null)
+                return F.t(getLocalNodeId(), false);
+
+            boolean clientPingRes;
+
+            try {
+                clientPingRes = clientWorker.ping();
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new IgniteInterruptedCheckedException(e);
+            }
+
+            return F.t(getLocalNodeId(), clientPingRes);
+        }
+
+        GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>();
+
+        IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
+
+        if (oldFut != null)
+            return oldFut.get();
+        else {
+            Collection<Throwable> errs = null;
+
+            try {
+                Socket sock = null;
+
+                for (int i = 0; i < adapter.reconCnt; i++) {
+                    try {
+                        if (addr.isUnresolved())
+                            addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
+
+                        long tstamp = U.currentTimeMillis();
+
+                        sock = adapter.openSocket(addr);
+
+                        adapter.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+
+                        TcpDiscoveryPingResponse res = adapter.readMessage(sock, null, adapter.netTimeout);
+
+                        if (locNodeId.equals(res.creatorNodeId())) {
+                            if (log.isDebugEnabled())
+                                log.debug("Ping response from local node: " + res);
+
+                            break;
+                        }
+
+                        adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+
+                        IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists());
+
+                        fut.onDone(t);
+
+                        return t;
+                    }
+                    catch (IOException | IgniteCheckedException e) {
+                        if (errs == null)
+                            errs = new ArrayList<>();
+
+                        errs.add(e);
+                    }
+                    finally {
+                        U.closeQuiet(sock);
+                    }
+                }
+            }
+            catch (Throwable t) {
+                fut.onDone(t);
+
+                if (t instanceof Error)
+                    throw t;
+
+                throw U.cast(t);
+            }
+            finally {
+                if (!fut.isDone())
+                    fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs));
+
+                boolean b = pingMap.remove(addr, fut);
+
+                assert b;
+            }
+
+            return fut.get();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void disconnect() throws IgniteSpiException {
+        spiStop0(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator nodeAuth) {
+        this.nodeAuth = nodeAuth;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
+        try {
+            msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, adapter.marsh.marshal(evt)));
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void failNode(UUID nodeId) {
+        ClusterNode node = ring.node(nodeId);
+
+        if (node != null) {
+            TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
+                node.id(), node.order());
+
+            msgWorker.addMessage(msg);
+        }
+    }
+
+    /**
+     * Tries to join this node to topology.
+     *
+     * @throws IgniteSpiException If any error occurs.
+     */
+    private void joinTopology() throws IgniteSpiException {
+        synchronized (mux) {
+            assert spiState == CONNECTING || spiState == DISCONNECTED;
+
+            spiState = CONNECTING;
+        }
+
+        SecurityCredentials locCred = (SecurityCredentials)locNode.getAttributes()
+            .get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+        // Marshal credentials for backward compatibility and security.
+        marshalCredentials(locNode);
+
+        while (true) {
+            if (!sendJoinRequestMessage()) {
+                if (log.isDebugEnabled())
+                    log.debug("Join request message has not been sent (local node is the first in the topology).");
+
+                if (nodeAuth != null) {
+                    // Authenticate local node.
+                    try {
+                        SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred);
+
+                        if (subj == null)
+                            throw new IgniteSpiException("Authentication failed for local node: " + locNode.id());
+
+                        Map<String, Object> attrs = new HashMap<>(locNode.attributes());
+
+                        attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT,
+                            adapter.ignite().configuration().getMarshaller().marshal(subj));
+                        attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+                        locNode.setAttributes(attrs);
+                    }
+                    catch (IgniteException | IgniteCheckedException e) {
+                        throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e);
+                    }
+                }
+
+                locNode.order(1);
+                locNode.internalOrder(1);
+
+                adapter.gridStartTime = U.currentTimeMillis();
+
+                locNode.visible(true);
+
+                ring.clear();
+
+                ring.topologyVersion(1);
+
+                synchronized (mux) {
+                    topHist.clear();
+
+                    spiState = CONNECTED;
+
+                    mux.notifyAll();
+                }
+
+                notifyDiscovery(EVT_NODE_JOINED, 1, locNode);
+
+                break;
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Join request message has been sent (waiting for coordinator response).");
+
+            synchronized (mux) {
+                long threshold = U.currentTimeMillis() + adapter.netTimeout;
+
+                long timeout = adapter.netTimeout;
+
+                while (spiState == CONNECTING && timeout > 0) {
+                    try {
+                        mux.wait(timeout);
+
+                        timeout = threshold - U.currentTimeMillis();
+                    }
+                    catch (InterruptedException ignored) {
+                        Thread.currentThread().interrupt();
+
+                        throw new IgniteSpiException("Thread has been interrupted.");
+                    }
+                }
+
+                if (spiState == CONNECTED)
+                    break;
+                else if (spiState == DUPLICATE_ID)
+                    throw adapter.duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get());
+                else if (spiState == AUTH_FAILED)
+                    throw adapter.authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get());
+                else if (spiState == CHECK_FAILED)
+                    throw adapter.checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get());
+                else if (spiState == LOOPBACK_PROBLEM) {
+                    TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get();
+
+                    boolean locHostLoopback = adapter.locHost.isLoopbackAddress();
+
+                    String firstNode = locHostLoopback ? "local" : "remote";
+
+                    String secondNode = locHostLoopback ? "remote" : "local";
+
+                    throw new IgniteSpiException("Failed to add node to topology because " + firstNode +
+                        " node is configured to use loopback address, but " + secondNode + " node is not " +
+                        "(consider changing 'localAddress' configuration parameter) " +
+                        "[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" +
+                        U.addressesAsString(msg.addresses(), msg.hostNames()) + ']');
+                }
+                else
+                    LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " +
+                        "Check remote nodes logs for possible error messages. " +
+                        "Note that large topology may require significant time to start. " +
+                        "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " +
+                        "if getting this message on the starting nodes [networkTimeout=" + adapter.netTimeout + ']');
+            }
+        }
+
+        assert locNode.order() != 0;
+        assert locNode.internalOrder() != 0;
+
+        if (log.isDebugEnabled())
+            log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder());
+    }
+
+    /**
+     * Tries to send join request message to a random node presenting in topology.
+     * Address is provided by {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is
+     * sent to first node connection succeeded to.
+     *
+     * @return {@code true} if send succeeded.
+     * @throws IgniteSpiException If any error occurs.
+     */
+    @SuppressWarnings({"BusyWait"})
+    private boolean sendJoinRequestMessage() throws IgniteSpiException {
+        TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
+            adapter.collectExchangeData(getLocalNodeId()));
+
+        // Time when it has been detected, that addresses from IP finder do not respond.
+        long noResStart = 0;
+
+        while (true) {
+            Collection<InetSocketAddress> addrs = adapter.resolvedAddresses();
+
+            if (F.isEmpty(addrs))
+                return false;
+
+            boolean retry = false;
+            Collection<Exception> errs = new ArrayList<>();
+
+            try (SocketMultiConnector multiConnector = new SocketMultiConnector(adapter, addrs, 2)) {
+                GridTuple3<InetSocketAddress, Socket, Exception> tuple;
+
+                while ((tuple = multiConnector.next()) != null) {
+                    InetSocketAddress addr = tuple.get1();
+                    Socket sock = tuple.get2();
+                    Exception ex = tuple.get3();
+
+                    if (ex == null) {
+                        assert sock != null;
+
+                        try {
+                            Integer res = sendMessageDirectly(joinReq, addr, sock);
+
+                            assert res != null;
+
+                            noResAddrs.remove(addr);
+
+                            // Address is responsive, reset period start.
+                            noResStart = 0;
+
+                            switch (res) {
+                                case RES_WAIT:
+                                    // Concurrent startup, try sending join request again or wait if no success.
+                                    retry = true;
+
+                                    break;
+                                case RES_OK:
+                                    if (log.isDebugEnabled())
+                                        log.debug("Join request message has been sent to address [addr=" + addr +
+                                            ", req=" + joinReq + ']');
+
+                                    // Join request sending succeeded, wait for response from topology.
+                                    return true;
+
+                                default:
+                                    // Concurrent startup, try next node.
+                                    if (res == RES_CONTINUE_JOIN) {
+                                        if (!fromAddrs.contains(addr))
+                                            retry = true;
+                                    }
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Unexpected response to join request: " + res);
+
+                                        retry = true;
+                                    }
+
+                                    break;
+                            }
+                        }
+                        catch (IgniteSpiException e) {
+                            e.printStackTrace();
+
+                            ex = e;
+                        }
+                    }
+
+                    if (ex != null) {
+                        errs.add(ex);
+
+                        if (log.isDebugEnabled()) {
+                            IOException ioe = X.cause(ex, IOException.class);
+
+                            log.debug("Failed to send join request message [addr=" + addr +
+                                ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']');
+
+                            onException("Failed to send join request message [addr=" + addr +
+                                ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']', ioe);
+                        }
+
+                        noResAddrs.add(addr);
+                    }
+                }
+            }
+
+            if (retry) {
+                if (log.isDebugEnabled())
+                    log.debug("Concurrent discovery SPI start has been detected (local node should wait).");
+
+                try {
+                    U.sleep(2000);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    throw new IgniteSpiException("Thread has been interrupted.", e);
+                }
+            }
+            else if (!adapter.ipFinder.isShared() && !ipFinderHasLocAddr) {
+                IgniteCheckedException e = null;
+
+                if (!errs.isEmpty()) {
+                    e = new IgniteCheckedException("Multiple connection attempts failed.");
+
+                    for (Exception err : errs)
+                        e.addSuppressed(err);
+                }
+
+                if (e != null && X.hasCause(e, ConnectException.class))
+                    LT.warn(log, null, "Failed to connect to any address from IP finder " +
+                        "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
+                        addrs);
+
+                if (adapter.joinTimeout > 0) {
+                    if (noResStart == 0)
+                        noResStart = U.currentTimeMillis();
+                    else if (U.currentTimeMillis() - noResStart > adapter.joinTimeout)
+                        throw new IgniteSpiException(
+                            "Failed to connect to any address from IP finder within join timeout " +
+                                "(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
+                                "on all host machines, or consider increasing 'joinTimeout' configuration property): " +
+                                addrs, e);
+                }
+
+                try {
+                    U.sleep(2000);
+                }
+                catch (IgniteInterruptedCheckedException ex) {
+                    throw new IgniteSpiException("Thread has been interrupted.", ex);
+                }
+            }
+            else
+                break;
+        }
+
+        return false;
+    }
+
+    /**
+     * Establishes connection to an address, sends message and returns the response (if any).
+     *
+     * @param msg Message to send.
+     * @param addr Address to send message to.
+     * @return Response read from the recipient or {@code null} if no response is supposed.
+     * @throws IgniteSpiException If an error occurs.
+     */
+    @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, Socket sock)
+        throws IgniteSpiException {
+        assert msg != null;
+        assert addr != null;
+
+        Collection<Throwable> errs = null;
+
+        long ackTimeout0 = adapter.ackTimeout;
+
+        int connectAttempts = 1;
+
+        boolean joinReqSent = false;
+
+        UUID locNodeId = getLocalNodeId();
+
+        for (int i = 0; i < adapter.reconCnt; i++) {
+            // Need to set to false on each new iteration,
+            // since remote node may leave in the middle of the first iteration.
+            joinReqSent = false;
+
+            boolean openSock = false;
+
+            try {
+                long tstamp = U.currentTimeMillis();
+
+                if (sock == null)
+                    sock = adapter.openSocket(addr);
+
+                openSock = true;
+
+                // Handshake.
+                adapter.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+
+                TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, ackTimeout0);
+
+                if (locNodeId.equals(res.creatorNodeId())) {
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake response from local node: " + res);
+
+                    break;
+                }
+
+                adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+
+                // Send message.
+                tstamp = U.currentTimeMillis();
+
+                adapter.writeToSocket(sock, msg);
+
+                adapter.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+
+                if (debugMode)
+                    debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
+                        ", rmtNodeId=" + res.creatorNodeId() + ']');
+
+                if (log.isDebugEnabled())
+                    log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
+                        ", rmtNodeId=" + res.creatorNodeId() + ']');
+
+                // Connection has been established, but
+                // join request may not be unmarshalled on remote host.
+                // E.g. due to class not found issue.
+                joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
+
+                return adapter.readReceipt(sock, ackTimeout0);
+            }
+            catch (ClassCastException e) {
+                // This issue is rarely reproducible on AmazonEC2, but never
+                // on dedicated machines.
+                if (log.isDebugEnabled())
+                    U.error(log, "Class cast exception on direct send: " + addr, e);
+
+                onException("Class cast exception on direct send: " + addr, e);
+
+                if (errs == null)
+                    errs = new ArrayList<>();
+
+                errs.add(e);
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.error("Exception on direct send: " + e.getMessage(), e);
+
+                onException("Exception on direct send: " + e.getMessage(), e);
+
+                if (errs == null)
+                    errs = new ArrayList<>();
+
+                errs.add(e);
+
+                if (!openSock) {
+                    // Reconnect for the second time, if connection is not established.
+                    if (connectAttempts < 2) {
+                        connectAttempts++;
+
+                        continue;
+                    }
+
+                    break; // Don't retry if we can not establish connection.
+                }
+
+                if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+                    ackTimeout0 *= 2;
+
+                    if (!checkAckTimeout(ackTimeout0))
+                        break;
+                }
+            }
+            finally {
+                U.closeQuiet(sock);
+
+                sock = null;
+            }
+        }
+
+        if (joinReqSent) {
+            if (log.isDebugEnabled())
+                log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT).");
+
+            // Topology will not include this node,
+            // however, warning on timed out join will be output.
+            return RES_OK;
+        }
+
+        throw new IgniteSpiException(
+            "Failed to send message to address [addr=" + addr + ", msg=" + msg + ']',
+            U.exceptionWithSuppressed("Failed to send message to address " +
+                "[addr=" + addr + ", msg=" + msg + ']', errs));
+    }
+
+    /**
+     * Marshalls credentials with discovery SPI marshaller (will replace attribute value).
+     *
+     * @param node Node to marshall credentials for.
+     * @throws IgniteSpiException If marshalling failed.
+     */
+    private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
+        try {
+            // Use security-unsafe getter.
+            Map<String, Object> attrs = new HashMap<>(node.getAttributes());
+
+            attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
+                adapter.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+
+            node.setAttributes(attrs);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e);
+        }
+    }
+
+    /**
+     * Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value).
+     *
+     * @param node Node to unmarshall credentials for.
+     * @return Security credentials.
+     * @throws IgniteSpiException If unmarshal fails.
+     */
+    private SecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
+        try {
+            byte[] credBytes = (byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+            if (credBytes == null)
+                return null;
+
+            return adapter.marsh.unmarshal(credBytes, null);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
+        }
+    }
+
+    /**
+     * @param ackTimeout Acknowledgement timeout.
+     * @return {@code True} if acknowledgement timeout is less or equal to
+     * maximum acknowledgement timeout, {@code false} otherwise.
+     */
+    private boolean checkAckTimeout(long ackTimeout) {
+        if (ackTimeout > adapter.maxAckTimeout) {
+            LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
+                "(consider increasing 'maxAckTimeout' configuration property) " +
+                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + adapter.maxAckTimeout + ']');
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Notify external listener on discovery event.
+     *
+     * @param type Discovery event type. See {@link org.apache.ignite.events.DiscoveryEvent} for more details.
+     * @param topVer Topology version.
+     * @param node Remote node this event is connected with.
+     */
+    private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) {
+        assert type > 0;
+        assert node != null;
+
+        DiscoverySpiListener lsnr = adapter.lsnr;
+
+        TcpDiscoverySpiState spiState = spiStateCopy();
+
+        if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) {
+            if (log.isDebugEnabled())
+                log.debug("Discovery notification [node=" + node + ", spiState=" + spiState +
+                    ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
+
+            Collection<ClusterNode> top = F.upcast(ring.visibleNodes());
+
+            Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top);
+
+            lsnr.onDiscovery(type, topVer, node, top, hist, null);
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState +
+                ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
+    }
+
+    /**
+     * Update topology history with new topology snapshots.
+     *
+     * @param topVer Topology version.
+     * @param top Topology snapshot.
+     * @return Copy of updated topology history.
+     */
+    @Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
+        synchronized (mux) {
+            if (topHist.containsKey(topVer))
+                return null;
+
+            topHist.put(topVer, top);
+
+            while (topHist.size() > adapter.topHistSize)
+                topHist.remove(topHist.firstKey());
+
+            if (log.isDebugEnabled())
+                log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size());
+
+            return new TreeMap<>(topHist);
+        }
+    }
+
+    /**
+     * Checks whether local node is coordinator. Nodes that are leaving or failed
+     * (but are still in topology) are removed from search.
+     *
+     * @return {@code true} if local node is coordinator.
+     */
+    private boolean isLocalNodeCoordinator() {
+        synchronized (mux) {
+            boolean crd = spiState == CONNECTED && locNode.equals(resolveCoordinator());
+
+            if (crd)
+                adapter.stats.onBecomingCoordinator();
+
+            return crd;
+        }
+    }
+
+    /**
+     * @return Spi state copy.
+     */
+    private TcpDiscoverySpiState spiStateCopy() {
+        TcpDiscoverySpiState state;
+
+        synchronized (mux) {
+            state = spiState;
+        }
+
+        return state;
+    }
+
+    /**
+     * Resolves coordinator. Nodes that are leaving or failed (but are still in
+     * topology) are removed from search.
+     *
+     * @return Coordinator node or {@code null} if there are no coordinator
+     * (i.e. local node is the last one and is currently stopping).
+     */
+    @Nullable private TcpDiscoveryNode resolveCoordinator() {
+        return resolveCoordinator(null);
+    }
+
+    /**
+     * Resolves coordinator. Nodes that are leaving or failed (but are still in
+     * topology) are removed from search as well as provided filter.
+     *
+     * @param filter Nodes to exclude when resolving coordinator (optional).
+     * @return Coordinator node or {@code null} if there are no coordinator
+     * (i.e. local node is the last one and is currently stopping).
+     */
+    @Nullable private TcpDiscoveryNode resolveCoordinator(
+        @Nullable Collection<TcpDiscoveryNode> filter) {
+        synchronized (mux) {
+            Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes);
+
+            if (!F.isEmpty(filter))
+                excluded = F.concat(false, excluded, filter);
+
+            return ring.coordinator(excluded);
+        }
+    }
+
+    /**
+     * Prints SPI statistics.
+     */
+    private void printStatistics() {
+        if (log.isInfoEnabled() && adapter.statsPrintFreq > 0) {
+            int failedNodesSize;
+            int leavingNodesSize;
+
+            synchronized (mux) {
+                failedNodesSize = failedNodes.size();
+                leavingNodesSize = leavingNodes.size();
+            }
+
+            Runtime runtime = Runtime.getRuntime();
+
+            TcpDiscoveryNode coord = resolveCoordinator();
+
+            log.info("Discovery SPI statistics [statistics=" + adapter.stats + ", spiState=" + spiStateCopy() +
+                ", coord=" + coord +
+                ", topSize=" + ring.allNodes().size() +
+                ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize +
+                ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") +
+                ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) : "N/A") +
+                ", heapFree=" + runtime.freeMemory() / (1024 * 1024) +
+                "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]");
+        }
+    }
+
+    /**
+     * @param msg Message to prepare.
+     * @param destNodeId Destination node ID.
+     * @param msgs Messages to include.
+     * @param discardMsgId Discarded message ID.
+     */
+    private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId,
+        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+        assert destNodeId != null;
+
+        if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+            TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+            TcpDiscoveryNode node = nodeAddedMsg.node();
+
+            if (node.id().equals(destNodeId)) {
+                Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
+                Collection<TcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size());
+
+                for (TcpDiscoveryNode n0 : allNodes) {
+                    assert n0.internalOrder() != 0 : n0;
+
+                    // Skip next node and nodes added after next
+                    // in case this message is resent due to failures/leaves.
+                    // There will be separate messages for nodes with greater
+                    // internal order.
+                    if (n0.internalOrder() < nodeAddedMsg.node().internalOrder())
+                        topToSend.add(n0);
+                }
+
+                nodeAddedMsg.topology(topToSend);
+                nodeAddedMsg.messages(msgs, discardMsgId);
+
+                Map<Long, Collection<ClusterNode>> hist;
+
+                synchronized (mux) {
+                    hist = new TreeMap<>(topHist);
+                }
+
+                nodeAddedMsg.topologyHistory(hist);
+            }
+        }
+    }
+
+    /**
+     * @param msg Message to clear.
+     */
+    private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) {
+        if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+            // Nullify topology before registration.
+            TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+            nodeAddedMsg.topology(null);
+            nodeAddedMsg.topologyHistory(null);
+            nodeAddedMsg.messages(null, null);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override void simulateNodeFailure() {
+        U.warn(log, "Simulating node failure: " + getLocalNodeId());
+
+        U.interrupt(tcpSrvr);
+        U.join(tcpSrvr, log);
+
+        U.interrupt(hbsSnd);
+        U.join(hbsSnd, log);
+
+        U.interrupt(chkStatusSnd);
+        U.join(chkStatusSnd, log);
+
+        U.interrupt(ipFinderCleaner);
+        U.join(ipFinderCleaner, log);
+
+        Collection<SocketReader> tmp;
+
+        synchronized (mux) {
+            tmp = U.arrayList(readers);
+        }
+
+        U.interrupt(tmp);
+        U.joinThreads(tmp, log);
+
+        U.interrupt(msgWorker);
+        U.join(msgWorker, log);
+
+        U.interrupt(statsPrinter);
+        U.join(statsPrinter, log);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void brakeConnection() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteSpiThread workerThread() {
+        return msgWorker;
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     * <p>
+     * Simulates situation when next node is still alive but is bypassed
+     * since it has been excluded from the ring, possibly, due to short time
+     * network problems.
+     * <p>
+     * This method is intended for test purposes only.
+     */
+    void forceNextNodeFailure() {
+        U.warn(log, "Next node will be forcibly failed (if any).");
+
+        TcpDiscoveryNode next;
+
+        synchronized (mux) {
+            next = ring.nextNode(failedNodes);
+        }
+
+        if (next != null)
+            msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), next.id(),
+                next.internalOrder()));
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     * <p>
+     * This method is intended for test purposes only.
+     *
+     * @return Nodes ring.
+     */
+    TcpDiscoveryNodesRing ring() {
+        return ring;
+    }
+
+    /** {@inheritDoc} */
+    public void dumpDebugInfo(IgniteLogger log) {
+        if (!debugMode) {
+            U.quietAndWarn(log, "Failed to dump debug info (discovery SPI was not configured " +
+                "in debug mode, consider setting 'debugMode' configuration property to 'true').");
+
+            return;
+        }
+
+        assert log.isInfoEnabled();
+
+        synchronized (mux) {
+            StringBuilder b = new StringBuilder(U.nl());
+
+            b.append(">>>").append(U.nl());
+            b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl());
+            b.append(">>>").append(U.nl());
+
+            b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl());
+            b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl());
+            b.append("SPI state: ").append(spiState).append(U.nl()).append(U.nl());
+
+            b.append("Internal threads: ").append(U.nl());
+
+            b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
+            b.append("    Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
+            b.append("    HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
+            b.append("    Socket timeout worker: ").append(threadStatus(adapter.sockTimeoutWorker)).append(U.nl());
+            b.append("    IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
+            b.append("    Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
+
+            b.append(U.nl());
+
+            b.append("Socket readers: ").append(U.nl());
+
+            for (SocketReader rdr : readers)
+                b.append("    ").append(rdr).append(U.nl());
+
+            b.append(U.nl());
+
+            b.append("In-memory log messages: ").append(U.nl());
+
+            for (String msg : debugLog)
+                b.append("    ").append(msg).append(U.nl());
+
+            b.append(U.nl());
+
+            b.append("Leaving nodes: ").append(U.nl());
+
+            for (TcpDiscoveryNode node : leavingNodes)
+                b.append("    ").append(node.id()).append(U.nl());
+
+            b.append(U.nl());
+
+            b.append("Failed nodes: ").append(U.nl());
+
+            for (TcpDiscoveryNode node : failedNodes)
+                b.append("    ").append(node.id()).append(U.nl());
+
+            b.append(U.nl());
+
+            b.append("Stats: ").append(adapter.stats).append(U.nl());
+
+            U.quietAndInfo(log, b.toString());
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void debugLog(String msg) {
+        assert debugMode;
+
+        String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
+            '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
+            "-" + locNode.internalOrder() + "] " +
+            msg;
+
+        debugLog.add(msg0);
+
+        int delta = debugLog.size() - debugMsgHist;
+
+        for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
+            debugLog.poll();
+    }
+
+    /**
+     * @param msg Message.
+     * @return {@code True} if recordable in debug mode.
+     */
+    private boolean recordable(TcpDiscoveryAbstractMessage msg) {
+        return !(msg instanceof TcpDiscoveryHeartbeatMessage) &&
+            !(msg instanceof TcpDiscoveryStatusCheckMessage) &&
+            !(msg instanceof TcpDiscoveryDiscardMessage);
+    }
+
+    /**
+     * Checks if two given {@link SecurityPermissionSet} objects contain the same permissions.
+     * Each permission belongs to one of three groups : cache, task or system.
+     *
+     * @param locPerms The first set of permissions.
+     * @param rmtPerms The second set of permissions.
+     * @return {@code True} if given parameters contain the same permissions, {@code False} otherwise.
+     */
+    private boolean permissionsEqual(SecurityPermissionSet locPerms, SecurityPermissionSet rmtPerms) {
+        boolean dfltAllowMatch = !(locPerms.defaultAllowAll() ^ rmtPerms.defaultAllowAll());
+
+        boolean bothHaveSamePerms = F.eqNotOrdered(rmtPerms.systemPermissions(), locPerms.systemPermissions()) &&
+            F.eqNotOrdered(rmtPerms.cachePermissions(), locPerms.cachePermissions()) &&
+            F.eqNotOrdered(rmtPerms.taskPermissions(), locPerms.taskPermissions());
+
+        return dfltAllowMatch && bothHaveSamePerms;
+    }
+
+    /**
+     * @param msg Message.
+     * @param nodeId Node ID.
+     */
+    private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
+        msg.removeMetrics(nodeId);
+        msg.removeCacheMetrics(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServerImpl.class, this);
+    }
+
+    /**
+     * Thread that sends heartbeats.
+     */
+    private class HeartbeatsSender extends IgniteSpiThread {
+        /**
+         * Constructor.
+         */
+        private HeartbeatsSender() {
+            super(adapter.ignite().name(), "tcp-disco-hb-sender", log);
+
+            setPriority(adapter.threadPri);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("BusyWait")
+        @Override protected void body() throws InterruptedException {
+            while (!isLocalNodeCoordinator())
+                Thread.sleep(1000);
+
+            if (log.isDebugEnabled())
+                log.debug("Heartbeats sender has been started.");
+
+            while (!isInterrupted()) {
+                if (spiStateCopy() != CONNECTED) {
+                    if (log.isDebugEnabled())
+                        log.debug("Stopping heartbeats sender (SPI is not connected to topology).");
+
+                    return;
+                }
+
+                TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId());
+
+                msg.verify(getLocalNodeId());
+
+                msgWorker.addMessage(msg);
+
+                Thread.sleep(adapter.hbFreq);
+            }
+        }
+    }
+
+    /**
+     * Thread that sends status check messages to next node if local node has not
+     * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
+     * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
+     * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
+     */
+    private class CheckStatusSender extends IgniteSpiThread {
+        /**
+         * Constructor.
+         */
+        private CheckStatusSender() {
+            super(adapter.ignite().name(), "tcp-disco-status-check-sender", log);
+
+            setPriority(adapter.threadPri);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("BusyWait")
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("Status check sender has been started.");
+
+            // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
+            long checkTimeout = (long)adapter.maxMissedHbs * adapter.hbFreq + 50;
+
+            long lastSent = 0;
+
+            while (!isInterrupted()) {
+                // 1. Determine timeout.
+                if (lastSent < locNode.lastUpdateTime())
+                    lastSent = locNode.lastUpdateTime();
+
+                long timeout = (lastSent + checkTimeout) - U.currentTimeMillis();
+
+                if (timeout > 0)
+                    Thread.sleep(timeout);
+
+                // 2. Check if SPI is still connected.
+                if (spiStateCopy() != CONNECTED) {
+                    if (log.isDebugEnabled())
+                        log.debug("Stopping status check sender (SPI is not connected to topology).");
+
+                    return;
+                }
+
+                // 3. Was there an update?
+                if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping status check send " +
+                            "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) +
+                            ", hasRmts=" + ring.hasRemoteNodes() + ']');
+
+                    continue;
+                }
+
+                // 4. Send status check message.
+                lastSent = U.currentTimeMillis();
+
+                msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+            }
+        }
+    }
+
+    /**
+     * Thread that cleans IP finder and keeps it in the correct state, unregistering
+     * addresses of the nodes that has left the topology.
+     * <p>
+     * This thread should run only on coordinator node and will clean IP finder
+     * if and only if {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()} is {@code true}.
+     */
+    private class IpFinderCleaner extends IgniteSpiThread {
+        /**
+         * Constructor.
+         */
+        private IpFinderCleaner() {
+            super(adapter.ignite().name(), "tcp-disco-ip-finder-cleaner", log);
+
+            setPriority(adapter.threadPri);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("BusyWait")
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("IP finder cleaner has been started.");
+
+            while (!isInterrupted()) {
+                Thread.sleep(adapter.ipFinderCleanFreq);
+
+                if (!isLocalNodeCoordinator())
+                    continue;
+
+                if (spiStateCopy() != CONNECTED) {
+                    if (log.isDebugEnabled())
+                        log.debug("Stopping IP finder cleaner (SPI is not connected to topology).");
+
+                    return;
+                }
+
+                if (adapter.ipFinder.isShared())
+                    cleanIpFinder();
+            }
+        }
+
+        /**
+         * Cleans IP finder.
+         */
+        private void cleanIpFinder() {
+            assert adapter.ipFinder.isShared();
+
+            try {
+                // Addresses that belongs to nodes in topology.
+                Collection<InetSocketAddress> currAddrs = F.flatCollections(
+                    F.viewReadOnly(
+                        ring.allNodes(),
+                        new C1<TcpDiscoveryNode, Collection<InetSocketAddress>>() {
+                            @Override public Collection<InetSocketAddress> apply(TcpDiscoveryNode node) {
+                                return !node.isClient() ? adapter.getNodeAddresses(node) :
+                                    Collections.<InetSocketAddress>emptyList();
+                            }
+                        }
+                    )
+                );
+
+                // Addresses registered in IP finder.
+                Collection<InetSocketAddress> regAddrs = adapter.registeredAddresses();
+
+                // Remove all addresses that belong to alive nodes, leave dead-node addresses.
+                Collection<InetSocketAddress> rmvAddrs = F.view(
+                    regAddrs,
+                    F.notContains(currAddrs),
+                    new P1<InetSocketAddress>() {
+                        private final Map<InetSocketAddress, Boolean> pingResMap =
+                            new HashMap<>();
+
+                        @Override public boolean apply(InetSocketAddress addr) {
+                            Boolean res = pingResMap.get(addr);
+
+                            if (res == null) {
+                                try {
+                                    res = pingNode(addr, null).get1() != null;
+                                }
+                                catch (IgniteCheckedException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to ping node [addr=" + addr +
+                                            ", err=" + e.getMessage() + ']');
+
+                                    res = false;
+                                }
+                                finally {
+                                    pingResMap.put(addr, res);
+                                }
+                            }
+
+                            return !res;
+                        }
+                    }
+                );
+
+                // Unregister dead-nodes addresses.
+                if (!rmvAddrs.isEmpty()) {
+                    adapter.ipFinder.unregisterAddresses(rmvAddrs);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Unregistered addresses from IP finder: " + rmvAddrs);
+                }
+
+                // Addresses that were removed by mistake (e.g. on segmentation).
+                Collection<InetSocketAddress> missingAddrs = F.view(
+                    currAddrs,
+                    F.notContains(regAddrs)
+                );
+
+                // Re-register missing addresses.
+                if (!missingAddrs.isEmpty()) {
+                    adapter.ipFinder.registerAddresses(missingAddrs);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Registered missing addresses in IP finder: " + missingAddrs);
+                }
+            }
+            catch (IgniteSpiException e) {
+                LT.error(log, e, "Failed to clean IP finder up.");
+            }
+        }
+    }
+
+    /**
+     * Pending messages container.
+     */
+    private static class PendingMessages {
+        /** */
+        private static final int MAX = 1024;
+
+        /** Pending messages. */
+        private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+
+        /** Discarded message ID. */
+        private IgniteUuid discardId;
+
+        /**
+         * Adds pending message and shrinks queue if it exceeds limit
+         * (messages that were not discarded yet are never removed).
+         *
+         * @param msg Message to add.
+         */
+        void add(TcpDiscoveryAbstractMessage msg) {
+            msgs.add(msg);
+
+            while (msgs.size() > MAX) {
+                TcpDiscoveryAbstractMessage polled = msgs.poll();
+
+                assert polled != null;
+
+                if (polled.id().equals(discardId))
+                    break;
+            }
+        }
+
+        /**
+         * Gets messages starting from provided ID (exclusive). If such
+         * message is not found, {@code null} is returned (this indicates
+         * a failure condition when it was already removed from queue).
+         *
+         * @param lastMsgId Last message ID.
+         * @return Collection of messages.
+         */
+        @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid lastMsgId) {
+            assert lastMsgId != null;
+
+            Collection<TcpDiscoveryAbstractMessage> copy = new ArrayList<>(msgs.size());
+
+            boolean skip = true;
+
+            for (TcpDiscoveryAbstractMessage msg : msgs) {
+                if (skip) {
+                    if (msg.id().equals(lastMsgId))
+                        skip = false;
+                }
+                else
+                    copy.add(msg);
+            }
+
+            return !skip ? copy : null;
+        }
+
+        /**
+         * Resets pending messages.
+         *
+         * @param msgs Message.
+         * @param discardId Discarded message ID.
+         */
+        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
+            this.msgs.clear();
+
+            if (msgs != null)
+                this.msgs.addAll(msgs);
+
+            this.discardId = discardId;
+        }
+
+        /**
+         * Clears pending messages.
+         */
+        void clear() {
+            msgs.clear();
+
+            discardId = null;
+        }
+
+        /**
+         * Discards message with provided ID and all before it.
+         *
+         * @param id Discarded message ID.
+         */
+        void discard(IgniteUuid id) {
+            discardId = id;
+        }
+    }
+
+    /**
+     * Message worker thread for messages processing.
+     */
+    private class RingMessageWorker extends MessageWorkerAdapter {
+        /** Next node. */
+        @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+        private TcpDiscoveryNode next;
+
+        /** Pending messages. */
+        private final PendingMessages pendingMsgs = new PendingMessages();
+
+        /** Last message that updated topology. */
+        private TcpDiscoveryAbstractMessage lastMsg;
+
+        /** Force pending messages send. */
+        private boolean forceSndPending;
+
+        /** Socket. */
+        private Socket sock;
+
+        /**
+         */
+        protected RingMessageWorker() {
+            super("tcp-disco-msg-worker");
+        }
+
+        /**
+         * @param msg Message to process.
+         */
+        @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+            if (log.isDebugEnabled())
+                log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+
+            if (debugMode)
+                debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+
+            adapter.stats.onMessageProcessingStarted(msg);
+
+            if (msg instanceof TcpDiscoveryJoinRequestMessage)
+                processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryClientReconnectMessage)
+                processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryNodeAddedMessage)
+                processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+                processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryNodeLeftMessage)
+                processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryNodeFailedMessage)
+                processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
+                processClientHeartbeatMessage((TcpDiscoveryClientHeartbeatMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryHeartbeatMessage)
+                processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryStatusCheckMessage)
+                processStatusCheckMessage((TcpDiscoveryStatusCheckMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryDiscardMessage)
+                processDiscardMessage((TcpDiscoveryDiscardMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryCustomEventMessage)
+                processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+
+            else if (msg instanceof TcpDiscoveryClientPingRequest)
+                processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
+
+            else
+                assert false : "Unknown message type: " + msg.getClass().getSimpleName();
+
+            adapter.stats.onMessageProcessingFinished(msg);
+        }
+
+        /**
+         * Sends message across the ring.
+         *
+         * @param msg Message to send
+         */
+        @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"})
+        private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
+            assert msg != null;
+
+            assert ring.hasRemoteNodes();
+
+            for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : adapter.sendMsgLsnrs)
+                msgLsnr.apply(msg);
+
+            if (redirectToClients(msg)) {
+                byte[] marshalledMsg = null;
+
+                for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
+                    // Send a clone to client to avoid ConcurrentModificationException
+                    TcpDiscoveryAbstractMessage msgClone;
+
+                    try {
+                        if (marshalledMsg == null)
+                            marshalledMsg = adapter.marsh.marshal(msg);
+
+                        msgClone = adapter.marsh.unmarshal(marshalledMsg, null);
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to marshal message: " + msg, e);
+
+                        msgClone = msg;
+                    }
+
+                    clientMsgWorker.addMessage(msgClone);
+                }
+            }
+
+            Collection<TcpDiscoveryNode> failedNodes;
+
+            TcpDiscoverySpiState state;
+
+            synchronized (mux) {
+                failedNodes = U.arrayList(ServerImpl.this.failedNodes);
+
+                state = spiState;
+            }
+
+            Collection<Throwable> errs = null;
+
+            boolean sent = false;
+
+            boolean searchNext = true;
+
+            UUID locNodeId = getLocalNodeId();
+
+            while (true) {
+                if (searchNext) {
+                    TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
+
+                    if (newNext == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("No next node in topology.");
+
+                        if (debugMode)
+                            debugLog("No next node in topology.");
+
+                        if (ring.hasRemoteNodes()) {
+                            msg.senderNodeId(locNodeId);
+
+                            addMessage(msg);
+                        }
+
+                        break;
+                    }
+
+                    if (!newNext.equals(next)) {
+                        if (log.isDebugEnabled())
+                            log.debug("New next node [newNext=" + newNext + ", formerNext=" + next +
+                                ", ring=" + ring + ", failedNodes=" + failedNodes + ']');
+
+                        if (debugMode)
+                            debugLog("New next node [newNext=" + newNext + ", formerNext=" + next +
+                                ", ring=" + ring + ", failedNodes=" + failedNodes + ']');
+
+                        U.closeQuiet(sock);
+
+                        sock = null;
+
+                        next = newNext;
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Next node remains the same [nextId=" + next.id() +
+                            ", nextOrder=" + next.internalOrder() + ']');
+                }
+
+                // Flag that shows whether next node exists and accepts incoming connections.
+                boolean nextNodeExists = sock != null;
+
+                final boolean sameHost = U.sameMacs(locNode, next);
+
+                List<InetSocketAddress> localNodeAddresses = U.arrayList(locNode.socketAddresses());
+
+                addr: for (InetSocketAddress addr : adapter.getNodeAddresses(next, sameHost)) {
+                    long ackTimeout0 = adapter.ackTimeout;
+
+                    if (localNodeAddresses.contains(addr)){
+                        if (log.isDebugEnabled())
+                            log.debug("Skip to send message to the local node (probably remote node has the same " +
+                                "loopback address that local node): " + addr);
+
+                        continue;
+                    }
+
+                    for (int i = 0; i < adapter.reconCnt; i++) {
+                        if (sock == null) {
+                            nextNodeExists = false;
+
+                            boolean success = false;
+
+                            boolean openSock = false;
+
+                            // Restore ring.
+                            try {
+                                long tstamp = U.currentTimeMillis();
+
+                                sock = adapter.openSocket(addr);
+
+                                openSock = true;
+
+                                // Handshake.
+                                writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+
+                                TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, ackTimeout0);
+
+                                if (locNodeId.equals(res.creatorNodeId())) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Handshake response from local node: " + res);
+
+                                    U.closeQuiet(sock);
+
+                                    sock = null;
+
+                                    break;
+                                }
+
+                                adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+
+                                UUID nextId = res.creatorNodeId();
+
+                                long nextOrder = res.order();
+
+                                if (!next.id().equals(nextId)) {
+                                    // Node with different ID has bounded to the same port.
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to restore ring because next node ID received is not as " +
+                                            "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
+
+                                    if (debugMode)
+                                        debugLog("Failed to restore ring because next node ID received is not as " +
+                                            "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
+
+                                    break;
+                                }
+                                else {
+                                    // ID is as expected. Check node order.
+                                    if (nextOrder != next.internalOrder()) {
+                                        // Is next currently being added?
+                                        boolean nextNew = (msg instanceof TcpDiscoveryNodeAddedMessage &&
+                                            ((TcpDiscoveryNodeAddedMessage)msg).node().id().equals(nextId));
+
+                                        if (!nextNew) {
+                                            if (log.isDebugEnabled())
+                                                log.debug("Failed to restore ring because next node order received " +
+                                                    "is not as expected [expected=" + next.internalOrder() +
+                                                    ", rcvd=" + nextOrder + ", id=" + next.id() + ']');
+
+                                            if (debugMode)
+                                                debugLog("Failed to restore ring because next node order received " +
+                                                    "is not as expected [expected=" + next.internalOrder() +
+                                                    ", rcvd=" + nextOrder + ", id=" + next.id() + ']');
+
+                                            break;
+                                        }
+                                    }
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Initialized connection with next node: " + next.id());
+
+                                    if (debugMode)
+                                        debugLog("Initialized connection with next node: " + next.id());
+
+                                    errs = null;
+
+                                    success = true;
+                                }
+                            }
+                            catch (IOException | IgniteCheckedException e) {
+                                if (errs == null)
+                                    errs = new ArrayList<>();
+
+                                errs.add(e);
+
+                                if (log.isDebugEnabled())
+                                    U.error(log, "Failed to connect to next node [msg=" + msg
+                                        + ", err=" + e.getMessage() + ']', e);
+
+                                onException("Failed to connect to next node [msg=" + msg + ", err=" + e + ']', e);
+
+                                if (!openSock)
+                                    break; // Don't retry if we can not establish connection.
+
+                                if (e instanceof SocketTimeoutException ||
+                                    X.hasCause(e, SocketTimeoutException.class)) {
+                                    ackTimeout0 *= 2;
+
+                                    if (!checkAckTimeout(ackTimeout0))
+                                        break;
+                                }
+
+                                continue;
+                            }
+                            finally {
+                                if (!success) {
+                                    U.closeQuiet(sock);
+
+                                    sock = null;
+                                }
+                                else
+                                    // Next node exists and accepts incoming messages.
+                                    nextNodeExists = true;
+                            }
+                        }
+
+                        try {
+                            boolean failure;
+
+                            synchronized (mux) {
+                                failure = ServerImpl.this.failedNodes.size() < failedNodes.size();
+                            }
+
+                            assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;
+
+                            if (failure || forceSndPending) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Pending messages will be sent [failure=" + failure +
+                                        ", forceSndPending=" + forceSndPending + ']');
+
+                                if (debugMode)
+                                    debugLog("Pending messages will be sent [failure=" + failure +
+                                        ", forceSndPending=" + forceSndPending + ']');
+
+                                boolean skip = pendingMsgs.discardId != null;
+
+                                for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
+                                    if (skip) {
+                                        if (pendingMsg.id().equals(pendingMsgs.discardId))
+                                            skip = false;
+
+                                        continue;
+                                    }
+
+                                    long tstamp = U.currentTimeMillis();
+
+                                    prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
+                                        pendingMsgs.discardId);
+
+                                    try {
+                                        writeToSocket(sock, pendingMsg);
+                                    }
+                                    finally {
+                                        clearNodeAddedMessage(pendingMsg);
+                                    }
+
+                                    adapter.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
+
+                                    int res = adapter.readReceipt(sock, ackTimeout0);
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Pending message has been sent to next node [msg=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+                                            ", res=" + res + ']');
+
+                                    if (debugMode)
+                                        debugLog("Pending message has been sent to next node [msg=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+                                            ", res=" + res + ']');
+                                }
+                            }
+
+                            prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+
+                            try {
+                                long tstamp = U.currentTimeMillis();
+
+                                writeToSocket(sock, msg);
+
+                                adapter.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+
+                                int res = adapter.readReceipt(sock, ackTimeout0);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Message has been sent to next node [msg=" + msg +
+                                        ", next=" + next.id() +
+                                        ", res=" + res + ']');
+
+                                if (debugMode)
+                                    debugLog("Message has been sent to next node [msg=" + msg +
+                                        ", next=" + next.id() +
+                                        ", res=" + res + ']');
+                            }
+                            finally {
+                                clearNodeAddedMessage(msg);
+                            }
+
+                            registerPendingMessage(msg);
+
+                            sent = true;
+
+                            break addr;
+                        }
+                        catch (IOException | IgniteCheckedException e) {
+                            if (errs == null)
+                                errs = new ArrayList<>();
+
+                            errs.add(e);
+
+                            if (log.isDebugEnabled())
+                                U.error(log, "Failed to send message to next node [next=" + next.id() + ", msg=" + msg +
+                                    ", err=" + e + ']', e);
+
+                            onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
+                                e);
+
+                            if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+                                ackTimeout0 *= 2;
+
+                                if (!checkAckTimeout(ackTimeout0))
+                                    break;
+                            }
+                        }
+                        finally {
+                            forceSndPending = false;
+
+                            if (!sent) {
+                                U.closeQuiet(sock);
+
+                                sock = null;
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg +
+                                        ", i=" + i + ']');
+                            }
+                        }
+                    } // Try to reconnect.
+                } // Iterating node's addresses.
+
+                if (!sent) {
+                    if (!failedNodes.contains(next)) {
+                        failedNodes.add(next);
+
+                        if (state == CONNECTED) {
+                            Exception err = errs != null ?
+                                U.exceptionWithSuppressed("Failed to send message to next node [msg=" + msg +
+                                    ", next=" + U.toShortString(next) + ']', errs) :
+                                null;
+
+                            // If node existed on connection initialization we should check
+                            // whether it has not gone yet.
+                            if (nextNodeExists && pingNode(next))
+                                U.error(log, "Failed to send message to next node [msg=" + msg +
+                                    ", next=" + next + ']', err);
+                            else if (log.isDebugEnabled())
+                                log.debug("Failed to send message to next node [msg=" + msg + ", next=" + next +
+                                    ", errMsg=" + (err != null ? err.getMessage() : "N/A") + ']');
+                        }
+                    }
+
+                    if (msg instanceof TcpDiscoveryStatusCheckMessage) {
+                        TcpDiscoveryStatusCheckMessage msg0 = (TcpDiscoveryStatusCheckMessage)msg;
+
+                        if (next.id().equals(msg0.failedNodeId())) {
+                            next = null;
+
+                            if (log.isDebugEnabled())
+                                log.debug("Discarding status check since next node has indeed failed [next=" + next +
+                                    ", msg=" + msg + ']');
+
+                            // Discard status check message by exiting loop and handle failure.
+                            break;
+                        }
+                    }
+
+                    next = null;
+
+                    searchNext = true;
+
+                    errs = null;
+                }
+                else
+                    break;
+            }
+
+            synchronized (mux) {
+                failedNodes.removeAll(ServerImpl.this.failedNodes);
+            }
+
+            if (!failedNodes.isEmpty()) {
+                if (state == CONNECTED) {
+                    if (!sent && log.isDebugEnabled())
+                        // Message has not been sent due to some problems.
+                        log.debug("Message has not been sent: " + m

<TRUNCATED>