You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/01 13:44:54 UTC

[02/53] [abbrv] incubator-ignite git commit: # IGNITE-943 Merge TcpDiscoverySpi and TcpClientDiscoverySpi

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 8ceac1c..2b2c691 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -18,22 +18,17 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.events.*;
-import org.apache.ignite.internal.processors.security.*;
 import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.security.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.jdk.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.discovery.*;
@@ -45,22 +40,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.spi.discovery.tcp.messages.*;
 import org.jetbrains.annotations.*;
-import org.jsr166.*;
 
 import java.io.*;
 import java.net.*;
-import java.text.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.IgniteNodeAttributes.*;
-import static org.apache.ignite.spi.IgnitePortProtocol.*;
-import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
-import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*;
-import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*;
-
 /**
  * Discovery SPI implementation that uses TCP/IP for node discovery.
  * <p>
@@ -150,10 +136,43 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
 @IgniteSpiMultipleInstancesSupport(true)
 @DiscoverySpiOrderSupport(true)
 @DiscoverySpiHistorySupport(true)
-public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscoverySpiMBean {
+public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
+    /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
+    public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
+
     /** Default local port range (value is <tt>100</tt>). */
     public static final int DFLT_PORT_RANGE = 100;
 
+    /** Default port to listen (value is <tt>47500</tt>). */
+    public static final int DFLT_PORT = 47500;
+
+    /** Default timeout for joining topology (value is <tt>0</tt>). */
+    public static final long DFLT_JOIN_TIMEOUT = 0;
+
+    /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */
+    public static final long DFLT_NETWORK_TIMEOUT = 5000;
+
+    /** Default value for thread priority (value is <tt>10</tt>). */
+    public static final int DFLT_THREAD_PRI = 10;
+
+    /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */
+    public static final long DFLT_HEARTBEAT_FREQ = 100;
+
+    /** Default size of topology snapshots history. */
+    public static final int DFLT_TOP_HISTORY_SIZE = 1000;
+
+    /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
+    public static final long DFLT_SOCK_TIMEOUT = 200;
+
+    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */
+    public static final long DFLT_ACK_TIMEOUT = 50;
+
+    /** Default socket operations timeout in milliseconds (value is <tt>700ms</tt>). */
+    public static final long DFLT_SOCK_TIMEOUT_CLIENT = 700;
+
+    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>700ms</tt>). */
+    public static final long DFLT_ACK_TIMEOUT_CLIENT = 700;
+
     /** Default reconnect attempts count (value is <tt>10</tt>). */
     public static final int DFLT_RECONNECT_CNT = 10;
 
@@ -172,154 +191,236 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
     /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */
     public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
 
-    /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
-    public static final long DFLT_SOCK_TIMEOUT = 200;
-
-    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */
-    public static final long DFLT_ACK_TIMEOUT = 50;
-
-    /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
-    public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
+    /** Local address. */
+    protected String locAddr;
 
     /** Address resolver. */
     private AddressResolver addrRslvr;
 
-    /** Local port which node uses. */
-    private int locPort = DFLT_PORT;
+    /** IP finder. */
+    protected TcpDiscoveryIpFinder ipFinder;
 
-    /** Local port range. */
-    private int locPortRange = DFLT_PORT_RANGE;
+    /** Socket operations timeout. */
+    protected long sockTimeout; // Must be initialized in the constructor of child class.
 
-    /** Statistics print frequency. */
-    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"})
-    private long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
+    /** Message acknowledgement timeout. */
+    protected long ackTimeout; // Must be initialized in the constructor of child class.
 
-    /** Maximum message acknowledgement timeout. */
-    private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
+    /** Network timeout. */
+    protected long netTimeout = DFLT_NETWORK_TIMEOUT;
 
-    /** Max heartbeats count node can miss without initiating status check. */
-    private int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
+    /** Join timeout. */
+    @SuppressWarnings("RedundantFieldInitialization")
+    protected long joinTimeout = DFLT_JOIN_TIMEOUT;
 
-    /** Max heartbeats count node can miss without failing client node. */
-    private int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
+    /** Thread priority for all threads started by SPI. */
+    protected int threadPri = DFLT_THREAD_PRI;
 
-    /** IP finder clean frequency. */
-    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    private long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ;
+    /** Heartbeat messages issuing frequency. */
+    protected long hbFreq = DFLT_HEARTBEAT_FREQ;
 
-    /** Reconnect attempts count. */
-    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    private int reconCnt = DFLT_RECONNECT_CNT;
+    /** Size of topology snapshots history. */
+    protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
 
-    /** */
-    private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
-        new LinkedBlockingQueue<Runnable>());
+    /** Grid discovery listener. */
+    protected volatile DiscoverySpiListener lsnr;
 
-    /** Nodes ring. */
-    @GridToStringExclude
-    private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
+    /** Data exchange. */
+    protected DiscoverySpiDataExchange exchange;
+
+    /** Metrics provider. */
+    protected DiscoveryMetricsProvider metricsProvider;
 
-    /** Topology snapshots history. */
-    private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+    /** Local node attributes. */
+    protected Map<String, Object> locNodeAttrs;
 
-    /** Socket readers. */
-    private final Collection<SocketReader> readers = new LinkedList<>();
+    /** Local node version. */
+    protected IgniteProductVersion locNodeVer;
 
-    /** TCP server for discovery SPI. */
-    private TcpServer tcpSrvr;
+    /** Local node. */
+    protected TcpDiscoveryNode locNode;
 
-    /** Message worker. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private RingMessageWorker msgWorker;
+    /** Local host. */
+    protected InetAddress locHost;
 
-    /** Client message workers. */
-    private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
+    /** Internal and external addresses of local node. */
+    protected Collection<InetSocketAddress> locNodeAddrs;
 
-    /** Metrics sender. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private HeartbeatsSender hbsSnd;
+    /** Socket timeout worker. */
+    protected SocketTimeoutWorker sockTimeoutWorker;
 
-    /** Status checker. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private CheckStatusSender chkStatusSnd;
+    /** Start time of the very first grid node. */
+    protected volatile long gridStartTime;
 
-    /** IP finder cleaner. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private IpFinderCleaner ipFinderCleaner;
+    /** Marshaller. */
+    protected final Marshaller marsh = new JdkMarshaller();
 
-    /** Statistics printer thread. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private StatisticsPrinter statsPrinter;
+    /** Statistics. */
+    protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics();
 
-    /** Failed nodes (but still in topology). */
-    private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
+    /** Local port which node uses. */
+    protected int locPort = DFLT_PORT;
+
+    /** Local port range. */
+    protected int locPortRange = DFLT_PORT_RANGE;
+
+    /** Reconnect attempts count. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    protected int reconCnt = DFLT_RECONNECT_CNT;
 
-    /** Leaving nodes (but still in topology). */
-    private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
+    /** Statistics print frequency. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"})
+    protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
 
-    /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
-    private boolean ipFinderHasLocAddr;
+    /** Maximum message acknowledgement timeout. */
+    protected long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
 
-    /** Addresses that do not respond during join requests send (for resolving concurrent start). */
-    private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>();
+    /** Max heartbeats count node can miss without initiating status check. */
+    protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
 
-    /** Addresses that incoming join requests send were send from (for resolving concurrent start). */
-    private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>();
+    /** Max heartbeats count node can miss without failing client node. */
+    protected int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
 
-    /** Response on join request from coordinator (in case of duplicate ID or auth failure). */
-    private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1();
+    /** IP finder clean frequency. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    protected long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ;
 
     /** Context initialization latch. */
     @GridToStringExclude
     private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
 
-    /** Node authenticator. */
-    private DiscoverySpiNodeAuthenticator nodeAuth;
+    /** */
+    protected final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs =
+        new CopyOnWriteArrayList<>();
+
+    /** */
+    protected final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs =
+        new CopyOnWriteArrayList<>();
+
+    /** Logger. */
+    @LoggerResource
+    protected IgniteLogger log;
+
+    /** */
+    protected TcpDiscoveryImpl impl;
+
+    /** */
+    private boolean clientMode;
+
+    /** {@inheritDoc} */
+    @Override public String getSpiState() {
+        return impl.getSpiState();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMessageWorkerQueueSize() {
+        return impl.getMessageWorkerQueueSize();
+    }
 
-    /** Mutex. */
-    private final Object mux = new Object();
+    /** {@inheritDoc} */
+    @Nullable @Override public UUID getCoordinator() {
+        return impl.getCoordinator();
+    }
 
-    /** Discovery state. */
-    protected TcpDiscoverySpiState spiState = DISCONNECTED;
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> getRemoteNodes() {
+        return impl.getRemoteNodes();
+    }
 
-    /** Map with proceeding ping requests. */
-    private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
-        new ConcurrentHashMap8<>();
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+        return impl.getNode(nodeId);
+    }
 
-    /** Debug mode. */
-    private boolean debugMode;
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        return impl.pingNode(nodeId);
+    }
 
-    /** Debug messages history. */
-    private int debugMsgHist = 512;
+    /** {@inheritDoc} */
+    @Override public void disconnect() throws IgniteSpiException {
+        impl.disconnect();
+    }
 
-    /** Received messages. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private ConcurrentLinkedDeque<String> debugLog;
+    /** {@inheritDoc} */
+    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
+        impl.setAuthenticator(auth);
+    }
 
-    /** */
-    private final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs =
-        new CopyOnWriteArrayList<>();
+    /** {@inheritDoc} */
+    @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+        impl.sendCustomEvent(msg);
+    }
 
-    /** */
-    private final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs =
-        new CopyOnWriteArrayList<>();
+    /** {@inheritDoc} */
+    @Override public void failNode(UUID nodeId) {
+        impl.failNode(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void dumpDebugInfo() {
+        impl.dumpDebugInfo(log);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClientMode() {
+        return clientMode;
+    }
 
     /**
-     * Default constructor.
+     * @param clientMode New client mode.
      */
-    public TcpDiscoverySpi() {
-        ackTimeout = DFLT_ACK_TIMEOUT;
-        sockTimeout = DFLT_SOCK_TIMEOUT;
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setClientMode(boolean clientMode) {
+        if (impl != null)
+            throw new IllegalStateException("You cannot change mode, TcpDiscoverySpi already started.");
+
+        this.clientMode = clientMode;
+
+        return this;
     }
 
-    /** {@inheritDoc} */
+    /**
+     * Inject resources
+     *
+     * @param ignite Ignite.
+     */
     @IgniteInstanceResource
-    @Override public void injectResources(Ignite ignite) {
+    @Override protected void injectResources(Ignite ignite) {
         super.injectResources(ignite);
 
         // Inject resource.
-        if (ignite != null)
+        if (ignite != null) {
+            setLocalAddress(ignite.configuration().getLocalHost());
             setAddressResolver(ignite.configuration().getAddressResolver());
+        }
+    }
+
+    /**
+     * Sets local host IP address that discovery SPI uses.
+     * <p>
+     * If not provided, by default a first found non-loopback address
+     * will be used. If there is no non-loopback address available,
+     * then {@link InetAddress#getLocalHost()} will be used.
+     *
+     * @param locAddr IP address.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setLocalAddress(String locAddr) {
+        // Injection should not override value already set by Spring or user.
+        if (this.locAddr == null)
+            this.locAddr = locAddr;
+
+        return this;
+    }
+
+    /**
+     * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method.
+     *
+     * @return local address.
+     */
+    public String getLocalAddress() {
+        return locAddr;
     }
 
     /**
@@ -360,8 +461,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
      * @see #setAckTimeout(long)
      */
     @IgniteSpiConfiguration(optional = true)
-    public void setReconnectCount(int reconCnt) {
+    public TcpDiscoverySpi setReconnectCount(int reconCnt) {
         this.reconCnt = reconCnt;
+
+        return this;
     }
 
     /** {@inheritDoc} */
@@ -382,8 +485,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
      * @param maxAckTimeout Maximum acknowledgement timeout.
      */
     @IgniteSpiConfiguration(optional = true)
-    public void setMaxAckTimeout(long maxAckTimeout) {
+    public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) {
         this.maxAckTimeout = maxAckTimeout;
+
+        return this;
     }
 
     /** {@inheritDoc} */
@@ -401,8 +506,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
      * @param locPort Local port to bind.
      */
     @IgniteSpiConfiguration(optional = true)
-    public void setLocalPort(int locPort) {
+    public TcpDiscoverySpi setLocalPort(int locPort) {
         this.locPort = locPort;
+
+        return this;
     }
 
     /** {@inheritDoc} */
@@ -420,8 +527,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
      * @param locPortRange Local port range to bind.
      */
     @IgniteSpiConfiguration(optional = true)
-    public void setLocalPortRange(int locPortRange) {
+    public TcpDiscoverySpi setLocalPortRange(int locPortRange) {
         this.locPortRange = locPortRange;
+
+        return this;
     }
 
     /** {@inheritDoc} */
@@ -437,8 +546,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
      * @param maxMissedHbs Max missed heartbeats.
      */
     @IgniteSpiConfiguration(optional = true)
-    public void setMaxMissedHeartbeats(int maxMissedHbs) {
+    public TcpDiscoverySpi setMaxMissedHeartbeats(int maxMissedHbs) {
         this.maxMissedHbs = maxMissedHbs;
+
+        return this;
     }
 
     /** {@inheritDoc} */
@@ -454,8 +565,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
      * @param maxMissedClientHbs Max missed client heartbeats.
      */
     @IgniteSpiConfiguration(optional = true)
-    public void setMaxMissedClientHeartbeats(int maxMissedClientHbs) {
+    public TcpDiscoverySpi setMaxMissedClientHeartbeats(int maxMissedClientHbs) {
         this.maxMissedClientHbs = maxMissedClientHbs;
+
+        return this;
     }
 
     /** {@inheritDoc} */
@@ -475,8 +588,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
      * @param statsPrintFreq Statistics print frequency in milliseconds.
      */
     @IgniteSpiConfiguration(optional = true)
-    public void setStatisticsPrintFrequency(long statsPrintFreq) {
+    public TcpDiscoverySpi setStatisticsPrintFrequency(long statsPrintFreq) {
         this.statsPrintFreq = statsPrintFreq;
+
+        return this;
     }
 
     /** {@inheritDoc} */
@@ -492,112 +607,187 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
      * @param ipFinderCleanFreq IP finder clean frequency.
      */
     @IgniteSpiConfiguration(optional = true)
-    public void setIpFinderCleanFrequency(long ipFinderCleanFreq) {
+    public TcpDiscoverySpi setIpFinderCleanFrequency(long ipFinderCleanFreq) {
         this.ipFinderCleanFreq = ipFinderCleanFreq;
+
+        return this;
     }
 
     /**
-     * This method is intended for troubleshooting purposes only.
+     * Gets IP finder for IP addresses sharing and storing.
      *
-     * @param debugMode {code True} to start SPI in debug mode.
+     * @return IP finder for IP addresses sharing and storing.
      */
-    public void setDebugMode(boolean debugMode) {
-        this.debugMode = debugMode;
+    public TcpDiscoveryIpFinder getIpFinder() {
+        return ipFinder;
     }
 
     /**
-     * This method is intended for troubleshooting purposes only.
+     * Sets IP finder for IP addresses sharing and storing.
+     * <p>
+     * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
      *
-     * @param debugMsgHist Message history log size.
+     * @param ipFinder IP finder.
      */
-    public void setDebugMessageHistory(int debugMsgHist) {
-        this.debugMsgHist = debugMsgHist;
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setIpFinder(TcpDiscoveryIpFinder ipFinder) {
+        this.ipFinder = ipFinder;
+
+        return this;
     }
 
-    /** {@inheritDoc} */
-    @Override public String getSpiState() {
-        synchronized (mux) {
-            return spiState.name();
-        }
+    /**
+     * Sets socket operations timeout. This timeout is used to limit connection time and
+     * write-to-socket time.
+     * <p>
+     * Note that when running Ignite on Amazon EC2, socket timeout must be set to a value
+     * significantly greater than the default (e.g. to {@code 30000}).
+     * <p>
+     * If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}.
+     *
+     * @param sockTimeout Socket connection timeout.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setSocketTimeout(long sockTimeout) {
+        this.sockTimeout = sockTimeout;
+
+        return this;
     }
 
-    /** {@inheritDoc} */
-    @Override public int getMessageWorkerQueueSize() {
-        return msgWorker.queueSize();
+    /**
+     * Sets timeout for receiving acknowledgement for sent message.
+     * <p>
+     * If acknowledgement is not received within this timeout, sending is considered as failed
+     * and SPI tries to repeat message sending.
+     * <p>
+     * If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}.
+     *
+     * @param ackTimeout Acknowledgement timeout.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setAckTimeout(long ackTimeout) {
+        this.ackTimeout = ackTimeout;
+
+        return this;
     }
 
-    /** {@inheritDoc} */
-    @Nullable @Override public UUID getCoordinator() {
-        TcpDiscoveryNode crd = resolveCoordinator();
+    /**
+     * Sets maximum network timeout to use for network operations.
+     * <p>
+     * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}.
+     *
+     * @param netTimeout Network timeout.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setNetworkTimeout(long netTimeout) {
+        this.netTimeout = netTimeout;
 
-        return crd != null ? crd.id() : null;
+        return this;
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
-        assert nodeId != null;
-
-        UUID locNodeId0 = getLocalNodeId();
+    @Override public long getJoinTimeout() {
+        return joinTimeout;
+    }
 
-        if (locNodeId0 != null && locNodeId0.equals(nodeId))
-            // Return local node directly.
-            return locNode;
+    /**
+     * Sets join timeout.
+     * <p>
+     * If non-shared IP finder is used and node fails to connect to
+     * any address from IP finder, node keeps trying to join within this
+     * timeout. If all addresses are still unresponsive, exception is thrown
+     * and node startup fails.
+     * <p>
+     * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}.
+     *
+     * @param joinTimeout Join timeout ({@code 0} means wait forever).
+     *
+     * @see TcpDiscoveryIpFinder#isShared()
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setJoinTimeout(long joinTimeout) {
+        this.joinTimeout = joinTimeout;
 
-        TcpDiscoveryNode node = ring.node(nodeId);
+        return this;
+    }
 
-        if (node != null && !node.visible())
-            return null;
+    /**
+     * Sets thread priority. All threads within SPI will be started with it.
+     * <p>
+     * If not provided, default value is {@link #DFLT_THREAD_PRI}
+     *
+     * @param threadPri Thread priority.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setThreadPriority(int threadPri) {
+        this.threadPri = threadPri;
 
-        return node;
+        return this;
     }
 
-    /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> getRemoteNodes() {
-        return F.upcast(ring.visibleRemoteNodes());
+    /**
+     * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
+     * in configurable time interval to other nodes to notify them about its state.
+     * <p>
+     * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
+     *
+     * @param hbFreq Heartbeat frequency in milliseconds.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setHeartbeatFrequency(long hbFreq) {
+        this.hbFreq = hbFreq;
+
+        return this;
     }
 
-    /** {@inheritDoc} */
-    @Override public void spiStart(String gridName) throws IgniteSpiException {
-        spiStart0(false);
+    /**
+     * @return Size of topology snapshots history.
+     */
+    public long getTopHistorySize() {
+        return topHistSize;
     }
 
     /**
-     * Starts or restarts SPI after stop (to reconnect).
+     * Sets size of topology snapshots history. Specified size should be greater than or equal to default size
+     * {@link #DFLT_TOP_HISTORY_SIZE}.
      *
-     * @param restart {@code True} if SPI is restarted after stop.
-     * @throws IgniteSpiException If failed.
+     * @param topHistSize Size of topology snapshots history.
      */
-    private void spiStart0(boolean restart) throws IgniteSpiException {
-        if (!restart)
-            // It is initial start.
-            onSpiStart();
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setTopHistorySize(int topHistSize) {
+        if (topHistSize < DFLT_TOP_HISTORY_SIZE) {
+            U.warn(log, "Topology history size should be greater than or equal to default size. " +
+                "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize +
+                ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']');
 
-        synchronized (mux) {
-            spiState = DISCONNECTED;
+            return this;
         }
 
-        if (debugMode) {
-            if (!log.isInfoEnabled())
-                throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
-                    "in debug mode.");
+        this.topHistSize = topHistSize;
 
-            debugLog = new ConcurrentLinkedDeque<>();
-
-            U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode.");
-        }
+        return this;
+    }
 
-        // Clear addresses collections.
-        fromAddrs.clear();
-        noResAddrs.clear();
+    /** {@inheritDoc} */
+    @Override public TcpDiscoverySpi setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
+        assert locNodeAttrs == null;
+        assert locNodeVer == null;
 
-        sockTimeoutWorker = new SocketTimeoutWorker();
-        sockTimeoutWorker.start();
+        if (log.isDebugEnabled()) {
+            log.debug("Node attributes to set: " + attrs);
+            log.debug("Node version to set: " + ver);
+        }
 
-        msgWorker = new RingMessageWorker();
-        msgWorker.start();
+        locNodeAttrs = attrs;
+        locNodeVer = ver;
 
-        tcpSrvr = new TcpServer();
+        return this;
+    }
 
+    /**
+     * @param srvPort Server port.
+     */
+    void initLocalNode(int srvPort, boolean addExtAddrAttr) {
         // Init local node.
         IgniteBiTuple<Collection<String>, Collection<String>> addrs;
 
@@ -612,166 +802,190 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
             getLocalNodeId(),
             addrs.get1(),
             addrs.get2(),
-            tcpSrvr.port,
+            srvPort,
             metricsProvider,
             locNodeVer);
 
-        Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
-            U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())),
-                locNode.discoveryPort());
+        if (addExtAddrAttr) {
+            Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
+                U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())),
+                    locNode.discoveryPort());
 
-        if (extAddrs != null)
-            locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
+            locNodeAddrs = new LinkedHashSet<>();
+            locNodeAddrs.addAll(locNode.socketAddresses());
 
-        locNode.setAttributes(locNodeAttrs);
+            if (extAddrs != null) {
+                locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
 
-        locNode.local(true);
+                locNodeAddrs.addAll(extAddrs);
+            }
+        }
 
-        locNodeAddrs = getNodeAddresses(locNode);
+        locNode.setAttributes(locNodeAttrs);
+        locNode.local(true);
 
         if (log.isDebugEnabled())
             log.debug("Local node initialized: " + locNode);
+    }
 
-        // Start TCP server thread after local node is initialized.
-        tcpSrvr.start();
-
-        ring.localNode(locNode);
+    /**
+     * @param node Node.
+     * @return {@link LinkedHashSet} of internal and external addresses of provided node.
+     *      Internal addresses placed before external addresses.
+     */
+    @SuppressWarnings("TypeMayBeWeakened")
+    LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) {
+        LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses());
 
-        if (ipFinder.isShared())
-            registerLocalNodeAddress();
-        else {
-            if (F.isEmpty(ipFinder.getRegisteredAddresses()))
-                throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " +
-                    "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " +
-                    "(specify list of IP addresses in configuration).");
+        Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
 
-            ipFinderHasLocAddr = ipFinderHasLocalAddress();
-        }
+        if (extAddrs != null)
+            res.addAll(extAddrs);
 
-        if (statsPrintFreq > 0 && log.isInfoEnabled()) {
-            statsPrinter = new StatisticsPrinter();
-            statsPrinter.start();
-        }
+        return res;
+    }
 
-        stats.onJoinStarted();
+    /**
+     * @param node Node.
+     * @param sameHost Same host flag.
+     * @return {@link LinkedHashSet} of internal and external addresses of provided node.
+     *      Internal addresses placed before external addresses.
+     *      Internal addresses will be sorted with {@code inetAddressesComparator(sameHost)}.
+     */
+    @SuppressWarnings("TypeMayBeWeakened")
+    LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node, boolean sameHost) {
+        List<InetSocketAddress> addrs = U.arrayList(node.socketAddresses());
 
-        joinTopology();
+        Collections.sort(addrs, U.inetAddressesComparator(sameHost));
 
-        stats.onJoinFinished();
+        LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(addrs);
 
-        hbsSnd = new HeartbeatsSender();
-        hbsSnd.start();
+        Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
 
-        chkStatusSnd = new CheckStatusSender();
-        chkStatusSnd.start();
+        if (extAddrs != null)
+            res.addAll(extAddrs);
 
-        if (ipFinder.isShared()) {
-            ipFinderCleaner = new IpFinderCleaner();
-            ipFinderCleaner.start();
-        }
+        return res;
+    }
 
-        if (log.isDebugEnabled() && !restart)
-            log.debug(startInfo());
+    /** {@inheritDoc} */
+    @Override public Collection<Object> injectables() {
+        return F.<Object>asList(ipFinder);
+    }
 
-        if (restart)
-            getSpiContext().registerPort(tcpSrvr.port, TCP);
+    /** {@inheritDoc} */
+    @Override public long getSocketTimeout() {
+        return sockTimeout;
     }
 
-    /**
-     * @throws IgniteSpiException If failed.
-     */
-    @SuppressWarnings("BusyWait")
-    private void registerLocalNodeAddress() throws IgniteSpiException {
-        // Make sure address registration succeeded.
-        while (true) {
-            try {
-                ipFinder.initializeLocalAddresses(locNode.socketAddresses());
+    /** {@inheritDoc} */
+    @Override public long getAckTimeout() {
+        return ackTimeout;
+    }
 
-                // Success.
-                break;
-            }
-            catch (IllegalStateException e) {
-                throw new IgniteSpiException("Failed to register local node address with IP finder: " +
-                    locNode.socketAddresses(), e);
-            }
-            catch (IgniteSpiException e) {
-                LT.error(log, e, "Failed to register local node address in IP finder on start " +
-                    "(retrying every 2000 ms).");
-            }
+    /** {@inheritDoc} */
+    @Override public long getNetworkTimeout() {
+        return netTimeout;
+    }
 
-            try {
-                U.sleep(2000);
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw new IgniteSpiException("Thread has been interrupted.", e);
-            }
-        }
+    /** {@inheritDoc} */
+    @Override public int getThreadPriority() {
+        return threadPri;
     }
 
-    /**
-     * @throws IgniteSpiException If failed.
-     */
-    private void onSpiStart() throws IgniteSpiException {
-        startStopwatch();
+    /** {@inheritDoc} */
+    @Override public long getHeartbeatFrequency() {
+        return hbFreq;
+    }
 
-        checkParameters();
+    /** {@inheritDoc} */
+    @Override public String getIpFinderFormatted() {
+        return ipFinder.toString();
+    }
 
-        assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0");
-        assertParameter(locPort > 1023, "localPort > 1023");
-        assertParameter(locPortRange >= 0, "localPortRange >= 0");
-        assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff");
-        assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
-        assertParameter(reconCnt > 0, "reconnectCnt > 0");
-        assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0");
-        assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0");
-        assertParameter(threadPri > 0, "threadPri > 0");
-        assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0");
+    /** {@inheritDoc} */
+    @Override public long getNodesJoined() {
+        return stats.joinedNodesCount();
+    }
 
-        try {
-            locHost = U.resolveLocalHost(locAddr);
-        }
-        catch (IOException e) {
-            throw new IgniteSpiException("Unknown local address: " + locAddr, e);
-        }
+    /** {@inheritDoc} */
+    @Override public long getNodesLeft() {
+        return stats.leftNodesCount();
+    }
 
-        if (log.isDebugEnabled()) {
-            log.debug(configInfo("localHost", locHost.getHostAddress()));
-            log.debug(configInfo("localPort", locPort));
-            log.debug(configInfo("localPortRange", locPortRange));
-            log.debug(configInfo("threadPri", threadPri));
-            log.debug(configInfo("networkTimeout", netTimeout));
-            log.debug(configInfo("sockTimeout", sockTimeout));
-            log.debug(configInfo("ackTimeout", ackTimeout));
-            log.debug(configInfo("maxAckTimeout", maxAckTimeout));
-            log.debug(configInfo("reconnectCount", reconCnt));
-            log.debug(configInfo("ipFinder", ipFinder));
-            log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq));
-            log.debug(configInfo("heartbeatFreq", hbFreq));
-            log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs));
-            log.debug(configInfo("statsPrintFreq", statsPrintFreq));
-        }
+    /** {@inheritDoc} */
+    @Override public long getNodesFailed() {
+        return stats.failedNodesCount();
+    }
 
-        // Warn on odd network timeout.
-        if (netTimeout < 3000)
-            U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
+    /** {@inheritDoc} */
+    @Override public long getPendingMessagesRegistered() {
+        return stats.pendingMessagesRegistered();
+    }
 
-        registerMBean(gridName, this, TcpDiscoverySpiMBean.class);
+    /** {@inheritDoc} */
+    @Override public long getPendingMessagesDiscarded() {
+        return stats.pendingMessagesDiscarded();
+    }
 
-        if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
-            TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder);
+    /** {@inheritDoc} */
+    @Override public long getAvgMessageProcessingTime() {
+        return stats.avgMessageProcessingTime();
+    }
 
-            if (mcastIpFinder.getLocalAddress() == null)
-                mcastIpFinder.setLocalAddress(locAddr);
-        }
+    /** {@inheritDoc} */
+    @Override public long getMaxMessageProcessingTime() {
+        return stats.maxMessageProcessingTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getTotalReceivedMessages() {
+        return stats.totalReceivedMessages();
     }
 
     /** {@inheritDoc} */
-    @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+    @Override public Map<String, Integer> getReceivedMessages() {
+        return stats.receivedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getTotalProcessedMessages() {
+        return stats.totalProcessedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Integer> getProcessedMessages() {
+        return stats.processedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getCoordinatorSinceTimestamp() {
+        return stats.coordinatorSinceTimestamp();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
         super.onContextInitialized0(spiCtx);
 
         ctxInitLatch.countDown();
 
-        spiCtx.registerPort(tcpSrvr.port, TCP);
+        ipFinder.onSpiContextInitialized(spiCtx);
+
+        impl.onContextInitialized0(spiCtx);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onContextDestroyed0() {
+        super.onContextDestroyed0();
+
+        if (ctxInitLatch.getCount() > 0)
+            // Safety.
+            ctxInitLatch.countDown();
+
+        if (ipFinder != null)
+            ipFinder.onSpiContextDestroyed();
+
+        getSpiContext().deregisterPorts();
     }
 
     /** {@inheritDoc} */
@@ -795,4621 +1009,858 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
     }
 
     /** {@inheritDoc} */
-    @Override public void spiStop() throws IgniteSpiException {
-        spiStop0(false);
+    @Override public ClusterNode getLocalNode() {
+        return locNode;
     }
 
-    /**
-     * Stops SPI finally or stops SPI for restart.
-     *
-     * @param disconnect {@code True} if SPI is being disconnected.
-     * @throws IgniteSpiException If failed.
-     */
-    private void spiStop0(boolean disconnect) throws IgniteSpiException {
-        if (ctxInitLatch.getCount() > 0)
-            // Safety.
-            ctxInitLatch.countDown();
+    /** {@inheritDoc} */
+    @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+        this.lsnr = lsnr;
+    }
 
-        if (log.isDebugEnabled()) {
-            if (disconnect)
-                log.debug("Disconnecting SPI.");
-            else
-                log.debug("Preparing to start local node stop procedure.");
-        }
+    /** {@inheritDoc} */
+    @Override public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange) {
+        this.exchange = exchange;
 
-        if (disconnect) {
-            synchronized (mux) {
-                spiState = DISCONNECTING;
-            }
-        }
+        return this;
+    }
 
-        if (msgWorker != null && msgWorker.isAlive() && !disconnect) {
-            // Send node left message only if it is final stop.
-            msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(getLocalNodeId()));
+    /** {@inheritDoc} */
+    @Override public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
+        this.metricsProvider = metricsProvider;
 
-            synchronized (mux) {
-                long threshold = U.currentTimeMillis() + netTimeout;
+        return this;
+    }
 
-                long timeout = netTimeout;
+    /** {@inheritDoc} */
+    @Override public long getGridStartTime() {
+        assert gridStartTime != 0;
 
-                while (spiState != LEFT && timeout > 0) {
-                    try {
-                        mux.wait(timeout);
+        return gridStartTime;
+    }
 
-                        timeout = threshold - U.currentTimeMillis();
-                    }
-                    catch (InterruptedException ignored) {
-                        Thread.currentThread().interrupt();
+    /**
+     * @param sockAddr Remote address.
+     * @return Opened socket.
+     * @throws IOException If failed.
+     */
+    protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+        assert sockAddr != null;
 
-                        break;
-                    }
-                }
+        InetSocketAddress resolved = sockAddr.isUnresolved() ?
+            new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr;
 
-                if (spiState == LEFT) {
-                    if (log.isDebugEnabled())
-                        log.debug("Verification for local node leave has been received from coordinator" +
-                            " (continuing stop procedure).");
-                }
-                else if (log.isInfoEnabled()) {
-                    log.info("No verification for local node leave has been received from coordinator" +
-                        " (will stop node anyway).");
-                }
-            }
-        }
+        InetAddress addr = resolved.getAddress();
 
-        U.interrupt(tcpSrvr);
-        U.join(tcpSrvr, log);
+        assert addr != null;
 
-        Collection<SocketReader> tmp;
+        Socket sock = new Socket();
 
-        synchronized (mux) {
-            tmp = U.arrayList(readers);
-        }
+        sock.bind(new InetSocketAddress(locHost, 0));
 
-        U.interrupt(tmp);
-        U.joinThreads(tmp, log);
+        sock.setTcpNoDelay(true);
 
-        U.interrupt(hbsSnd);
-        U.join(hbsSnd, log);
+        sock.connect(resolved, (int)sockTimeout);
 
-        U.interrupt(chkStatusSnd);
-        U.join(chkStatusSnd, log);
+        writeToSocket(sock, U.IGNITE_HEADER);
 
-        U.interrupt(ipFinderCleaner);
-        U.join(ipFinderCleaner, log);
+        return sock;
+    }
 
-        U.interrupt(msgWorker);
-        U.join(msgWorker, log);
+    /**
+     * Writes message to the socket.
+     *
+     * @param sock Socket.
+     * @param data Raw data to write.
+     * @throws IOException If IO failed or write timed out.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    protected void writeToSocket(Socket sock, byte[] data) throws IOException {
+        assert sock != null;
+        assert data != null;
 
-        U.interrupt(sockTimeoutWorker);
-        U.join(sockTimeoutWorker, log);
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
 
-        U.interrupt(statsPrinter);
-        U.join(statsPrinter, log);
+        sockTimeoutWorker.addTimeoutObject(obj);
 
-        if (ipFinder != null)
-            ipFinder.close();
+        IOException err = null;
 
-        Collection<TcpDiscoveryNode> rmts = null;
+        try {
+            OutputStream out = sock.getOutputStream();
 
-        if (!disconnect) {
-            // This is final stop.
-            unregisterMBean();
+            out.write(data);
 
-            if (log.isDebugEnabled())
-                log.debug(stopInfo());
+            out.flush();
         }
-        else {
-            getSpiContext().deregisterPorts();
-
-            rmts = ring.visibleRemoteNodes();
+        catch (IOException e) {
+            err = e;
         }
+        finally {
+            boolean cancelled = obj.cancel();
 
-        long topVer = ring.topologyVersion();
-
-        ring.clear();
-
-        if (rmts != null && !rmts.isEmpty()) {
-            // This is restart/disconnection and remote nodes are not empty.
-            // We need to fire FAIL event for each.
-            DiscoverySpiListener lsnr = this.lsnr;
+            if (cancelled)
+                sockTimeoutWorker.removeTimeoutObject(obj);
 
-            if (lsnr != null) {
-                Set<ClusterNode> processed = new HashSet<>();
+            // Throw original exception.
+            if (err != null)
+                throw err;
 
-                for (TcpDiscoveryNode n : rmts) {
-                    assert n.visible();
+            if (!cancelled)
+                throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
+        }
+    }
 
-                    processed.add(n);
+    /**
+     * Writes message to the socket.
+     *
+     * @param sock Socket.
+     * @param msg Message.
+     * @throws IOException If IO failed or write timed out.
+     * @throws IgniteCheckedException If marshalling failed.
+     */
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException {
+        writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K.
+    }
 
-                    List<ClusterNode> top = U.arrayList(rmts, F.notIn(processed));
+    /**
+     * Writes message to the socket.
+     *
+     * @param sock Socket.
+     * @param msg Message.
+     * @param bout Byte array output stream.
+     * @throws IOException If IO failed or write timed out.
+     * @throws IgniteCheckedException If marshalling failed.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout)
+        throws IOException, IgniteCheckedException {
+        assert sock != null;
+        assert msg != null;
+        assert bout != null;
 
-                    topVer++;
+        // Marshall message first to perform only write after.
+        marsh.marshal(msg, bout);
 
-                    Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer,
-                        Collections.unmodifiableList(top));
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
 
-                    lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null);
-                }
-            }
-        }
+        sockTimeoutWorker.addTimeoutObject(obj);
 
-        printStatistics();
+        IOException err = null;
 
-        stats.clear();
+        try {
+            OutputStream out = sock.getOutputStream();
 
-        synchronized (mux) {
-            // Clear stored data.
-            leavingNodes.clear();
-            failedNodes.clear();
+            bout.writeTo(out);
 
-            spiState = DISCONNECTED;
+            out.flush();
         }
-    }
+        catch (IOException e) {
+            err = e;
+        }
+        finally {
+            boolean cancelled = obj.cancel();
 
-    /** {@inheritDoc} */
-    @Override protected void onContextDestroyed0() {
-        super.onContextDestroyed0();
+            if (cancelled)
+                sockTimeoutWorker.removeTimeoutObject(obj);
 
-        if (ctxInitLatch.getCount() > 0)
-            // Safety.
-            ctxInitLatch.countDown();
+            // Throw original exception.
+            if (err != null)
+                throw err;
 
-        getSpiContext().deregisterPorts();
+            if (!cancelled)
+                throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
+        }
     }
 
     /**
-     * @throws IgniteSpiException If any error occurs.
-     * @return {@code true} if IP finder contains local address.
+     * Writes response to the socket.
+     *
+     * @param sock Socket.
+     * @param res Integer response.
+     * @throws IOException If IO failed or write timed out.
      */
-    private boolean ipFinderHasLocalAddress() throws IgniteSpiException {
-        for (InetSocketAddress locAddr : locNodeAddrs) {
-            for (InetSocketAddress addr : registeredAddresses())
-                try {
-                    int port = addr.getPort();
-
-                    InetSocketAddress resolved = addr.isUnresolved() ?
-                        new InetSocketAddress(InetAddress.getByName(addr.getHostName()), port) :
-                        new InetSocketAddress(addr.getAddress(), port);
-
-                    if (resolved.equals(locAddr))
-                        return true;
-                }
-                catch (UnknownHostException e) {
-                    onException(e.getMessage(), e);
-                }
-        }
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    protected void writeToSocket(Socket sock, int res) throws IOException {
+        assert sock != null;
 
-        return false;
-    }
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
 
-    /** {@inheritDoc} */
-    @Override public boolean pingNode(UUID nodeId) {
-        assert nodeId != null;
+        sockTimeoutWorker.addTimeoutObject(obj);
 
-        if (log.isDebugEnabled())
-            log.debug("Pinging node: " + nodeId + "].");
+        OutputStream out = sock.getOutputStream();
 
-        if (nodeId == getLocalNodeId())
-            return true;
+        IOException err = null;
 
-        TcpDiscoveryNode node = ring.node(nodeId);
+        try {
+            out.write(res);
 
-        if (node == null || !node.visible())
-            return false;
+            out.flush();
+        }
+        catch (IOException e) {
+            err = e;
+        }
+        finally {
+            boolean cancelled = obj.cancel();
 
-        boolean res = pingNode(node);
+            if (cancelled)
+                sockTimeoutWorker.removeTimeoutObject(obj);
 
-        if (!res && !node.isClient()) {
-            LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId);
+            // Throw original exception.
+            if (err != null)
+                throw err;
 
-            msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id()));
+            if (!cancelled)
+                throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
         }
-
-        return res;
     }
 
     /**
-     * Pings the remote node to see if it's alive.
+     * Reads message from the socket limiting read time.
      *
-     * @param node Node.
-     * @return {@code True} if ping succeeds.
+     * @param sock Socket.
+     * @param in Input stream (in case socket stream was wrapped).
+     * @param timeout Socket timeout for this operation.
+     * @return Message.
+     * @throws IOException If IO failed or read timed out.
+     * @throws IgniteCheckedException If unmarshalling failed.
      */
-    private boolean pingNode(TcpDiscoveryNode node) {
-        assert node != null;
-
-        if (node.id().equals(getLocalNodeId()))
-            return true;
-
-        UUID clientNodeId = null;
+    protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException {
+        assert sock != null;
 
-        if (node.isClient()) {
-            clientNodeId = node.id();
+        int oldTimeout = sock.getSoTimeout();
 
-            node = ring.node(node.clientRouterNodeId());
+        try {
+            sock.setSoTimeout((int)timeout);
 
-            if (node == null || !node.visible())
-                return false;
+            return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
         }
+        catch (IOException | IgniteCheckedException e) {
+            if (X.hasCause(e, SocketTimeoutException.class))
+                LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " +
+                    "in long GC pauses on remote node. Current timeout: " + timeout + '.');
 
-        for (InetSocketAddress addr : getNodeAddresses(node, U.sameMacs(locNode, node))) {
+            throw e;
+        }
+        finally {
+            // Quietly restore timeout.
             try {
-                // ID returned by the node should be the same as ID of the parameter for ping to succeed.
-                IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
-
-                return node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
+                sock.setSoTimeout(oldTimeout);
             }
-            catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']');
-
-                onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e);
-                // continue;
+            catch (SocketException ignored) {
+                // No-op.
             }
         }
-
-        return false;
     }
 
     /**
-     * Pings the node by its address to see if it's alive.
+     * Reads message delivery receipt from the socket.
      *
-     * @param addr Address of the node.
-     * @return ID of the remote node and "client exists" flag if node alive.
-     * @throws IgniteSpiException If an error occurs.
+     * @param sock Socket.
+     * @param timeout Socket timeout for this operation.
+     * @return Receipt.
+     * @throws IOException If IO failed or read timed out.
      */
-    private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId)
-        throws IgniteCheckedException {
-        assert addr != null;
+    protected int readReceipt(Socket sock, long timeout) throws IOException {
+        assert sock != null;
 
-        UUID locNodeId = getLocalNodeId();
+        int oldTimeout = sock.getSoTimeout();
+
+        try {
+            sock.setSoTimeout((int)timeout);
 
-        if (F.contains(locNodeAddrs, addr)) {
-            if (clientNodeId == null)
-                return F.t(getLocalNodeId(), false);
+            int res = sock.getInputStream().read();
 
-            ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId);
+            if (res == -1)
+                throw new EOFException();
 
-            if (clientWorker == null)
-                return F.t(getLocalNodeId(), false);
+            return res;
+        }
+        catch (SocketTimeoutException e) {
+            LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " +
+                "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " +
+                "configuration property). Will retry to send message with increased timeout. " +
+                "Current timeout: " + timeout + '.');
 
-            boolean clientPingRes;
+            stats.onAckTimeout();
 
+            throw e;
+        }
+        finally {
+            // Quietly restore timeout.
             try {
-                clientPingRes = clientWorker.ping();
+                sock.setSoTimeout(oldTimeout);
             }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new IgniteInterruptedCheckedException(e);
+            catch (SocketException ignored) {
+                // No-op.
             }
-
-            return F.t(getLocalNodeId(), clientPingRes);
         }
+    }
 
-        GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>();
-
-        IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
+    /**
+     * Resolves addresses registered in the IP finder, removes duplicates and local host
+     * address and returns the collection of.
+     *
+     * @return Resolved addresses without duplicates and local address (potentially
+     *      empty but never null).
+     * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
+     */
+    protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
+        List<InetSocketAddress> res = new ArrayList<>();
 
-        if (oldFut != null)
-            return oldFut.get();
-        else {
-            Collection<Throwable> errs = null;
+        Collection<InetSocketAddress> addrs;
 
+        // Get consistent addresses collection.
+        while (true) {
             try {
-                Socket sock = null;
-
-                for (int i = 0; i < reconCnt; i++) {
-                    try {
-                        if (addr.isUnresolved())
-                            addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
+                addrs = registeredAddresses();
 
-                        long tstamp = U.currentTimeMillis();
-
-                        sock = openSocket(addr);
+                break;
+            }
+            catch (IgniteSpiException e) {
+                LT.error(log, e, "Failed to get registered addresses from IP finder on start " +
+                    "(retrying every 2000 ms).");
+            }
 
-                        writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+            try {
+                U.sleep(2000);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new IgniteSpiException("Thread has been interrupted.", e);
+            }
+        }
 
-                        TcpDiscoveryPingResponse res = readMessage(sock, null, netTimeout);
+        for (InetSocketAddress addr : addrs) {
+            assert addr != null;
 
-                        if (locNodeId.equals(res.creatorNodeId())) {
-                            if (log.isDebugEnabled())
-                                log.debug("Ping response from local node: " + res);
+            try {
+                InetSocketAddress resolved = addr.isUnresolved() ?
+                    new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr;
 
-                            break;
-                        }
+                if (locNodeAddrs == null || !locNodeAddrs.contains(resolved))
+                    res.add(resolved);
+            }
+            catch (UnknownHostException ignored) {
+                LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr);
 
-                        stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+                // Add address in any case.
+                res.add(addr);
+            }
+        }
 
-                        IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists());
+        if (!res.isEmpty())
+            Collections.shuffle(res);
 
-                        fut.onDone(t);
-
-                        return t;
-                    }
-                    catch (IOException | IgniteCheckedException e) {
-                        if (errs == null)
-                            errs = new ArrayList<>();
-
-                        errs.add(e);
-                    }
-                    finally {
-                        U.closeQuiet(sock);
-                    }
-                }
-            }
-            catch (Throwable t) {
-                fut.onDone(t);
-
-                if (t instanceof Error)
-                    throw t;
-
-                throw U.cast(t);
-            }
-            finally {
-                if (!fut.isDone())
-                    fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs));
-
-                boolean b = pingMap.remove(addr, fut);
-
-                assert b;
-            }
-
-            return fut.get();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void disconnect() throws IgniteSpiException {
-        spiStop0(true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator nodeAuth) {
-        this.nodeAuth = nodeAuth;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
-        try {
-            msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt)));
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId) {
-        ClusterNode node = ring.node(nodeId);
-
-        if (node != null) {
-            TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
-                node.id(), node.order());
-
-            msgWorker.addMessage(msg);
-        }
-    }
-
-    /**
-     * Tries to join this node to topology.
-     *
-     * @throws IgniteSpiException If any error occurs.
-     */
-    private void joinTopology() throws IgniteSpiException {
-        synchronized (mux) {
-            assert spiState == CONNECTING || spiState == DISCONNECTED;
-
-            spiState = CONNECTING;
-        }
-
-        SecurityCredentials locCred = (SecurityCredentials)locNode.getAttributes()
-            .get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
-        // Marshal credentials for backward compatibility and security.
-        marshalCredentials(locNode);
-
-        while (true) {
-            if (!sendJoinRequestMessage()) {
-                if (log.isDebugEnabled())
-                    log.debug("Join request message has not been sent (local node is the first in the topology).");
-
-                if (nodeAuth != null) {
-                    // Authenticate local node.
-                    try {
-                        SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred);
-
-                        if (subj == null)
-                            throw new IgniteSpiException("Authentication failed for local node: " + locNode.id());
-
-                        Map<String, Object> attrs = new HashMap<>(locNode.attributes());
-
-                        attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT,
-                            ignite.configuration().getMarshaller().marshal(subj));
-                        attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
-                        locNode.setAttributes(attrs);
-                    }
-                    catch (IgniteException | IgniteCheckedException e) {
-                        throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e);
-                    }
-                }
-
-                locNode.order(1);
-                locNode.internalOrder(1);
-
-                gridStartTime = U.currentTimeMillis();
-
-                locNode.visible(true);
-
-                ring.clear();
-
-                ring.topologyVersion(1);
-
-                synchronized (mux) {
-                    topHist.clear();
-
-                    spiState = CONNECTED;
-
-                    mux.notifyAll();
-                }
-
-                notifyDiscovery(EVT_NODE_JOINED, 1, locNode);
-
-                break;
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Join request message has been sent (waiting for coordinator response).");
-
-            synchronized (mux) {
-                long threshold = U.currentTimeMillis() + netTimeout;
-
-                long timeout = netTimeout;
-
-                while (spiState == CONNECTING && timeout > 0) {
-                    try {
-                        mux.wait(timeout);
-
-                        timeout = threshold - U.currentTimeMillis();
-                    }
-                    catch (InterruptedException ignored) {
-                        Thread.currentThread().interrupt();
-
-                        throw new IgniteSpiException("Thread has been interrupted.");
-                    }
-                }
-
-                if (spiState == CONNECTED)
-                    break;
-                else if (spiState == DUPLICATE_ID)
-                    throw duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get());
-                else if (spiState == AUTH_FAILED)
-                    throw authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get());
-                else if (spiState == CHECK_FAILED)
-                    throw checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get());
-                else if (spiState == LOOPBACK_PROBLEM) {
-                    TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get();
-
-                    boolean locHostLoopback = locHost.isLoopbackAddress();
-
-                    String firstNode = locHostLoopback ? "local" : "remote";
-
-                    String secondNode = locHostLoopback ? "remote" : "local";
-
-                    throw new IgniteSpiException("Failed to add node to topology because " + firstNode +
-                        " node is configured to use loopback address, but " + secondNode + " node is not " +
-                        "(consider changing 'localAddress' configuration parameter) " +
-                        "[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" +
-                        U.addressesAsString(msg.addresses(), msg.hostNames()) + ']');
-                }
-                else
-                    LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " +
-                        "Check remote nodes logs for possible error messages. " +
-                        "Note that large topology may require significant time to start. " +
-                        "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " +
-                        "if getting this message on the starting nodes [networkTimeout=" + netTimeout + ']');
-            }
-        }
-
-        assert locNode.order() != 0;
-        assert locNode.internalOrder() != 0;
-
-        if (log.isDebugEnabled())
-            log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder());
-    }
+        return res;
+    }
 
     /**
-     * Tries to send join request message to a random node presenting in topology.
-     * Address is provided by {@link TcpDiscoveryIpFinder} and message is
-     * sent to first node connection succeeded to.
+     * Gets addresses registered in the IP finder, initializes addresses having no
+     * port (or 0 port) with {@link #DFLT_PORT}.
      *
-     * @return {@code true} if send succeeded.
-     * @throws IgniteSpiException If any error occurs.
+     * @return Registered addresses.
+     * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
      */
-    @SuppressWarnings({"BusyWait"})
-    private boolean sendJoinRequestMessage() throws IgniteSpiException {
-        TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
-            collectExchangeData(getLocalNodeId()));
-
-        // Time when it has been detected, that addresses from IP finder do not respond.
-        long noResStart = 0;
-
-        while (true) {
-            Collection<InetSocketAddress> addrs = resolvedAddresses();
-
-            if (F.isEmpty(addrs))
-                return false;
+    protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException {
+        Collection<InetSocketAddress> res = new ArrayList<>();
 
-            boolean retry = false;
-            Collection<Exception> errs = new ArrayList<>();
+        for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) {
+            if (addr.getPort() == 0) {
+                // TcpDiscoveryNode.discoveryPort() returns an correct port for a server node and 0 for client node.
+                int port = locNode.discoveryPort() != 0 ? locNode.discoveryPort() : DFLT_PORT;
 
-            try (SocketMultiConnector multiConnector = new SocketMultiConnector(this, addrs, 2)) {
-                GridTuple3<InetSocketAddress, Socket, Exception> tuple;
-
-                while ((tuple = multiConnector.next()) != null) {
-                    InetSocketAddress addr = tuple.get1();
-                    Socket sock = tuple.get2();
-                    Exception ex = tuple.get3();
-
-                    if (ex == null) {
-                        assert sock != null;
-
-                        try {
-                            Integer res = sendMessageDirectly(joinReq, addr, sock);
-
-                            assert res != null;
-
-                            noResAddrs.remove(addr);
-
-                            // Address is responsive, reset period start.
-                            noResStart = 0;
-
-                            switch (res) {
-                                case RES_WAIT:
-                                    // Concurrent startup, try sending join request again or wait if no success.
-                                    retry = true;
-
-                                    break;
-                                case RES_OK:
-                                    if (log.isDebugEnabled())
-                                        log.debug("Join request message has been sent to address [addr=" + addr +
-                                            ", req=" + joinReq + ']');
-
-                                    // Join request sending succeeded, wait for response from topology.
-                                    return true;
-
-                                default:
-                                    // Concurrent startup, try next node.
-                                    if (res == RES_CONTINUE_JOIN) {
-                                        if (!fromAddrs.contains(addr))
-                                            retry = true;
-                                    }
-                                    else {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Unexpected response to join request: " + res);
-
-                                        retry = true;
-                                    }
-
-                                    break;
-                            }
-                        }
-                        catch (IgniteSpiException e) {
-                            e.printStackTrace();
-
-                            ex = e;
-                        }
-                    }
-
-                    if (ex != null) {
-                        errs.add(ex);
-
-                        if (log.isDebugEnabled()) {
-                            IOException ioe = X.cause(ex, IOException.class);
-
-                            log.debug("Failed to send join request message [addr=" + addr +
-                                ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']');
-
-                            onException("Failed to send join request message [addr=" + addr +
-                                ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']', ioe);
-                        }
-
-                        noResAddrs.add(addr);
-                    }
-                }
+                addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), port) :
+                    new InetSocketAddress(addr.getAddress(), port);
             }
 
-            if (retry) {
-                if (log.isDebugEnabled())
-                    log.debug("Concurrent discovery SPI start has been detected (local node should wait).");
-
-                try {
-                    U.sleep(2000);
-                }
-                catch (IgniteInterruptedCheckedException e) {
-                    throw new IgniteSpiException("Thread has been interrupted.", e);
-                }
-            }
-            else if (!ipFinder.isShared() && !ipFinderHasLocAddr) {
-                IgniteCheckedException e = null;
-
-                if (!errs.isEmpty()) {
-                    e = new IgniteCheckedException("Multiple connection attempts failed.");
-
-                    for (Exception err : errs)
-                        e.addSuppressed(err);
-                }
-
-                if (e != null && X.hasCause(e, ConnectException.class))
-                    LT.warn(log, null, "Failed to connect to any address from IP finder " +
-                        "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
-                        addrs);
-
-                if (joinTimeout > 0) {
-                    if (noResStart == 0)
-                        noResStart = U.currentTimeMillis();
-                    else if (U.currentTimeMillis() - noResStart > joinTimeout)
-                        throw new IgniteSpiException(
-                            "Failed to connect to any address from IP finder within join timeout " +
-                                "(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
-                                "on all host machines, or consider increasing 'joinTimeout' configuration property): " +
-                                addrs, e);
-                }
-
-                try {
-                    U.sleep(2000);
-                }
-                catch (IgniteInterruptedCheckedException ex) {
-                    throw new IgniteSpiException("Thread has been interrupted.", ex);
-                }
-            }
-            else
-                break;
+            res.add(addr);
         }
 
-        return false;
+        return res;
     }
 
     /**
-     * Establishes connection to an address, sends message and returns the response (if any).
-     *
-     * @param msg Message to send.
-     * @param addr Address to send message to.
-     * @return Response read from the recipient or {@code null} if no response is supposed.
-     * @throws IgniteSpiException If an error occurs.
+     * @param msg Message.
+     * @return Error.
      */
-    @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, Socket sock)
-        throws IgniteSpiException {
+    protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) {
         assert msg != null;
-        assert addr != null;
-
-        Collection<Throwable> errs = null;
-
-        long ackTimeout0 = ackTimeout;
-
-        int connectAttempts = 1;
-
-        boolean joinReqSent = false;
-
-        UUID locNodeId = getLocalNodeId();
-
-        for (int i = 0; i < reconCnt; i++) {
-            // Need to set to false on each new iteration,
-            // since remote node may leave in the middle of the first iteration.
-            joinReqSent = false;
-
-            boolean openSock = false;
-
-            try {
-                long tstamp = U.currentTimeMillis();
-
-                if (sock == null)
-                    sock = openSocket(addr);
-
-                openSock = true;
-
-                // Handshake.
-                writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
-
-                TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout0);
-
-                if (locNodeId.equals(res.creatorNodeId())) {
-                    if (log.isDebugEnabled())
-                        log.debug("Handshake response from local node: " + res);
-
-                    break;
-                }
-
-                stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
-
-                // Send message.
-                tstamp = U.currentTimeMillis();
-
-                writeToSocket(sock, msg);
-
-                stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
-
-                if (debugMode)
-                    debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
-                        ", rmtNodeId=" + res.creatorNodeId() + ']');
-
-                if (log.isDebugEnabled())
-                    log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
-                        ", rmtNodeId=" + res.creatorNodeId() + ']');
-
-                // Connection has been established, but
-                // join request may not be unmarshalled on remote host.
-                // E.g. due to class not found issue.
-                joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
-
-                return readReceipt(sock, ackTimeout0);
-            }
-            catch (ClassCastException e) {
-                // This issue is rarely reproducible on AmazonEC2, but never
-                // on dedicated machines.
-                if (log.isDebugEnabled())
-                    U.error(log, "Class cast exception on direct send: " + addr, e);
-
-                onException("Class cast exception on direct send: " + addr, e);
-
-                if (errs == null)
-                    errs = new ArrayList<>();
-
-                errs.add(e);
-            }
-            catch (IOException | IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.error("Exception on direct send: " + e.getMessage(), e);
-
-                onException("Exception on direct send: " + e.getMessage(), e);
-
-                if (errs == null)
-                    errs = new ArrayList<>();
-
-                errs.add(e);
-
-                if (!openSock) {
-                    // Reconnect for the second time, if connection is not established.
-                    if (connectAttempts < 2) {
-                        connectAttempts++;
-
-                        continue;
-                    }
-
-                    break; // Don't retry if we can not establish connection.
-                }
-
-                if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
-                    ackTimeout0 *= 2;
-
-                    if (!checkAckTimeout(ackTimeout0))
-                        break;
-                }
-            }
-            finally {
-                U.closeQuiet(sock);
-
-                sock = null;
-            }
-        }
-
-        if (joinReqSent) {
-            if (log.isDebugEnabled())
-                log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT).");
-
-            // Topology will not include this node,
-            // however, warning on timed out join will be output.
-            return RES_OK;
-        }
 
-        throw new IgniteSpiException(
-            "Failed to send message to address [addr=" + addr + ", msg=" + msg + ']',
-            U.exceptionWithSuppressed("Failed to send message to address " +
-                "[addr=" + addr + ", msg=" + msg + ']', errs));
+        return new IgniteSpiException("Local node has the same ID as existing node in topology " +
+            "(fix configuration and restart local node) [localNode=" + locNode +
+            ", existingNode=" + msg.node() + ']');
     }
 
     /**
-     * Marshalls credentials with discovery SPI marshaller (will replace attribute value).
-     *
-     * @param node Node to marshall credentials for.
-     * @throws IgniteSpiException If marshalling failed.
+     * @param msg Message.
+     * @return Error.
      */
-    private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
-        try {
-            // Use security-unsafe getter.
-            Map<String, Object> attrs = new HashMap<>(node.getAttributes());
-
-            attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
-                marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+    protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) {
+        assert msg != null;
 
-            node.setAttributes(attrs);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e);
-        }
+        return new IgniteSpiException(new IgniteAuthenticationException("Authentication failed [nodeId=" +
+            msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']'));
     }
 
     /**
-     * Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value).
-     *
-     * @param node Node to unmarshall credentials for.
-     * @return Security credentials.
-     * @throws IgniteSpiException If unmarshal fails.
+     * @param msg Message.
+     * @return Error.
      */
-    private SecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
-        try {
-            byte[] credBytes = (byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
-            if (credBytes == null)
-                return null;
+    protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) {
+        assert msg != null;
 
-            return marsh.unmarshal(credBytes, null);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
-        }
+        return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) :
+            new IgniteSpiException(msg.error());
     }
 
     /**
-     * @param ackTimeout Acknowledgement timeout.
-     * @return {@code True} if acknowledgement timeout is less or equal to
-     * maximum acknowledgement timeout, {@code false} otherwise.
+     * @param msg Message.
+     * @return Whether delivery of the message is ensured.
      */
-    private boolean checkAckTimeout(long ackTimeout) {
-        if (ackTimeout > maxAckTimeout) {
-            LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
-                "(consider increasing 'maxAckTimeout' configuration property) " +
-                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + maxAckTimeout + ']');
-
-            return false;
-        }
-
-        return true;
+    protected boolean ensured(TcpDiscoveryAbstractMessage msg) {
+        return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null;
     }
 
     /**
-     * Notify external listener on discovery event.
-     *
-     * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
-     * @param topVer Topology version.
-     * @param node Remote node this event is connected with.
+     * @param msg Failed message.
+     * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise.
+     * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it
+     *      and create separate message for failed version check with next major release.
      */
-    private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) {
-        assert type > 0;
-        assert node != null;
-
-        DiscoverySpiListener lsnr = this.lsnr;
-
-        TcpDiscoverySpiState spiState = spiStateCopy();
-
-        if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) {
-            if (log.isDebugEnabled())
-                log.debug("Discovery notification [node=" + node + ", spiState=" + spiState +
-                    ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
-
-            Collection<ClusterNode> top = F.upcast(ring.visibleNodes());
-
-            Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top);
-
-            lsnr.onDiscovery(type, topVer, node, top, hist, null);
-        }
-        else if (log.isDebugEnabled())
-            log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState +
-                ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
+    @Deprecated
+    private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) {
+        return msg.error().contains("versions are not compatible");
     }
 
     /**
-     * Update topology history with new topology snapshots.
-     *
-     * @param topVer Topology version.
-     * @param top Topology snapshot.
-     * @return Copy of updated topology history.
+     * @param nodeId Node ID.
+     * @return Marshalled exchange data.
      */
-    @Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
-        synchronized (mux) {
-            if (topHist.containsKey(topVer))
-                return null;
+    protected Map<Integer, byte[]> collectExchangeData(UUID nodeId) {
+        Map<Integer, Serializable> data = exchange.collect(nodeId);
 
-            topHist.put(topVer, top);
+        if (data == null)
+            return null;
 
-            while (topHist.size() > topHistSize)
-                topHist.remove(topHist.firstKey());
+        Map<Integer, byte[]> data0 = U.newHashMap(data.size());
 
-            if (log.isDebugEnabled())
-                log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size());
+        for (Map.Entry<Integer, Serializable> entry : data.entrySet()) {
+            try {
+                byte[] bytes = marsh.marshal(entry.getValue());
 
-            return new TreeMap<>(topHist);
+                data0.put(entry.getKey(), bytes);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to marshal discovery data " +
+                    "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e);
+            }
         }
-    }
 
-    /**
-     * @param msg Error message.
-     * @param e Exception.
-     */
-    private void onException(String msg, Exception e){
-        getExceptionRegistry().onException(msg, e);
+        return data0;
     }
 
     /**
-     * @param node Node.
-     * @return {@link LinkedHashSet} of internal and external addresses of provided node.
-     *      Internal addresses placed before external addresses.
+     * @param joiningNodeID Joining node ID.
+     * @param nodeId Remote node ID for which data is provided.
+     * @param data Collection of marshalled discovery data objects from different components.
+     * @param clsLdr Class loader for discovery data unmarshalling.
      */
-    @SuppressWarnings("TypeMayBeWeakened")
-    private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) {
-        LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses());
-
-        Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
+    p

<TRUNCATED>