You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/01 13:44:57 UTC

[05/53] [abbrv] incubator-ignite git commit: # IGNITE-943 Merge TcpDiscoverySpi and TcpClientDiscoverySpi

# IGNITE-943 Merge TcpDiscoverySpi and TcpClientDiscoverySpi


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/838c0fd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/838c0fd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/838c0fd8

Branch: refs/heads/ignite-sprint-5
Commit: 838c0fd83b67b5e906144777d7340d5e61bdfa8a
Parents: 2f169f5
Author: sevdokimov <se...@gridgain.com>
Authored: Wed May 27 16:18:37 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Wed May 27 16:18:37 2015 +0300

----------------------------------------------------------------------
 .../main/java/org/apache/ignite/Ignition.java   |    9 +-
 .../org/apache/ignite/cluster/ClusterNode.java  |    6 +-
 .../configuration/IgniteConfiguration.java      |    4 +-
 .../apache/ignite/internal/IgniteKernal.java    |    5 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |   14 +-
 .../ignite/spi/discovery/DiscoverySpi.java      |   17 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 1481 +++++
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 4792 +++++++++++++++
 .../spi/discovery/tcp/SocketMultiConnector.java |    2 +-
 .../discovery/tcp/TcpClientDiscoverySpi.java    | 1573 -----
 .../tcp/TcpClientDiscoverySpiMBean.java         |  156 -
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  175 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 5777 ++++--------------
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   | 1185 ----
 .../spi/discovery/tcp/TcpDiscoverySpiMBean.java |    8 +
 .../internal/GridReleaseTypeSelfTest.java       |   16 +-
 .../GridDiscoveryManagerAliveCacheSelfTest.java |   17 +-
 .../GridDiscoveryManagerAttributesSelfTest.java |   21 +-
 .../discovery/GridDiscoveryManagerSelfTest.java |   21 +-
 ...acheTcpClientDiscoveryMultiThreadedTest.java |    8 +-
 .../IgniteClientDataStructuresAbstractTest.java |    9 +-
 ...ientModesTcpClientDiscoveryAbstractTest.java |   10 +-
 ...unctionExcludeNeighborsAbstractSelfTest.java |    5 +-
 .../GridCacheSyncReplicatedPreloadSelfTest.java |    1 -
 ...pClientDiscoveryMarshallerCheckSelfTest.java |    9 +-
 .../TcpClientDiscoverySpiConfigSelfTest.java    |   39 -
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |   44 +-
 .../tcp/TcpDiscoveryConcurrentStartTest.java    |    4 +-
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |    8 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |    2 +-
 .../tcp/TcpDiscoverySpiConfigSelfTest.java      |    8 +
 .../testframework/junits/GridAbstractTest.java  |    8 +-
 .../IgniteSpiDiscoverySelfTestSuite.java        |    1 -
 33 files changed, 7693 insertions(+), 7742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/Ignition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignition.java b/modules/core/src/main/java/org/apache/ignite/Ignition.java
index 3270f5c..35e0b51 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignition.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignition.java
@@ -20,6 +20,7 @@ package org.apache.ignite;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.jetbrains.annotations.*;
 
@@ -138,11 +139,11 @@ public class Ignition {
      * <p>
      * This flag used when node is started if {@link IgniteConfiguration#isClientMode()}
      * is {@code null}. When {@link IgniteConfiguration#isClientMode()} is set this flag is ignored.
-     * It is recommended to use {@link TcpClientDiscoverySpi} on client nodes.
+     * It is recommended to use {@link DiscoverySpi} in client mode too.
      *
      * @param clientMode Client mode flag.
      * @see IgniteConfiguration#isClientMode()
-     * @see TcpClientDiscoverySpi
+     * @see TcpDiscoverySpi#setClientMode(boolean)
      */
     public static void setClientMode(boolean clientMode) {
         IgnitionEx.setClientMode(clientMode);
@@ -153,11 +154,11 @@ public class Ignition {
      * <p>
      * This flag used when node is started if {@link IgniteConfiguration#isClientMode()}
      * is {@code null}. When {@link IgniteConfiguration#isClientMode()} is set this flag is ignored.
-     * It is recommended to use {@link TcpClientDiscoverySpi} on client nodes.
+     * It is recommended to use {@link DiscoverySpi} in client mode too.
      *
      * @return Client mode flag.
      * @see IgniteConfiguration#isClientMode()
-     * @see TcpClientDiscoverySpi
+     * @see TcpDiscoverySpi#setClientMode(boolean)
      */
     public static boolean isClientMode() {
         return IgnitionEx.isClientMode();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index 13dc30a..8f56372 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -20,7 +20,7 @@ package org.apache.ignite.cluster;
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -243,12 +243,12 @@ public interface ClusterNode {
      * Tests whether or not this node is connected to cluster as a client.
      * <p>
      * Do not confuse client in terms of
-     * discovery {@link TcpClientDiscoverySpi} and client in terms of cache
+     * discovery {@link DiscoverySpi#isClientMode()} and client in terms of cache
      * {@link IgniteConfiguration#isClientMode()}. Cache clients cannot carry data,
      * while topology clients connect to topology in a different way.
      *
      * @return {@code True} if this node is a client node, {@code false} otherwise.
-     * @see TcpClientDiscoverySpi
+     * @see DiscoverySpi#isClientMode()
      * @see IgniteConfiguration#isClientMode()
      * @see Ignition#isClientMode()
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index e47d4b1..7ddfd71 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -1824,10 +1824,10 @@ public class IgniteConfiguration {
 
     /**
      * Gets client mode flag. Client node cannot hold data in the caches. It's recommended to use
-     * {@link TcpClientDiscoverySpi} on client nodes.
+     * {@link DiscoverySpi} in client mode if this property is {@code true}.
      *
      * @return Client mode flag.
-     * @see TcpClientDiscoverySpi
+     * @see TcpDiscoverySpi#setClientMode(boolean)
      */
     public Boolean isClientMode() {
         return clientMode;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 84d81d7..d6e3ca4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -71,7 +71,6 @@ import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.mxbean.*;
 import org.apache.ignite.plugin.*;
 import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.discovery.tcp.*;
 import org.jetbrains.annotations.*;
 
 import javax.management.*;
@@ -1073,8 +1072,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         if (cfg.getIncludeEventTypes() != null && cfg.getIncludeEventTypes().length != 0)
             perf.add("Disable grid events (remove 'includeEventTypes' from configuration)");
 
-        if (Boolean.TRUE.equals(cfg.isClientMode()) && cfg.getDiscoverySpi() instanceof TcpDiscoverySpi)
-            perf.add("Use TcpClientDiscoverySpi instead of TcpDiscoverySpi to run client node");
+        if (Boolean.TRUE.equals(cfg.isClientMode()) && !cfg.getDiscoverySpi().isClientMode())
+            perf.add("Use TcpDiscoverySpi in client mode for client node");
 
         if (OptimizedMarshaller.available() && !(cfg.getMarshaller() instanceof OptimizedMarshaller))
             perf.add("Enable optimized marshaller (set 'marshaller' to " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 728fce6..13b015b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1744,8 +1744,8 @@ public class IgnitionEx {
             }
 
             if (myCfg.isClientMode() == null || !myCfg.isClientMode()) {
-                if (myCfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi) {
-                    throw new IgniteCheckedException("TcpClientDiscoverySpi can be used in client mode only" +
+                if (myCfg.getDiscoverySpi().isClientMode()) {
+                    throw new IgniteCheckedException("DiscoverySpi is in client mode, but node is not in client mode" +
                         "(consider changing 'IgniteConfiguration.clientMode' to 'true').");
                 }
             }
@@ -1815,14 +1815,14 @@ public class IgnitionEx {
          */
         private void initializeDefaultSpi(IgniteConfiguration cfg) {
             if (cfg.getDiscoverySpi() == null) {
+                cfg.setDiscoverySpi(new TcpDiscoverySpi());
+
                 if (cfg.isClientMode() != null && cfg.isClientMode())
-                    cfg.setDiscoverySpi(new TcpClientDiscoverySpi());
-                else
-                    cfg.setDiscoverySpi(new TcpDiscoverySpi());
+                    ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientMode(true);
             }
 
-            if (cfg.getDiscoverySpi() instanceof TcpDiscoverySpiAdapter) {
-                TcpDiscoverySpiAdapter tcpDisco = (TcpDiscoverySpiAdapter)cfg.getDiscoverySpi();
+            if (cfg.getDiscoverySpi() instanceof TcpDiscoverySpi) {
+                TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi)cfg.getDiscoverySpi();
 
                 if (tcpDisco.getIpFinder() == null)
                     tcpDisco.setIpFinder(new TcpDiscoveryMulticastIpFinder());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 7836e0f..4996d16 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -77,11 +78,10 @@ public interface DiscoverySpi extends IgniteSpi {
     /**
      * Sets node attributes and node version which will be distributed in grid during
      * join process. Note that these attributes cannot be changed and set only once.
-     *
-     * @param attrs Map of node attributes.
+     *  @param attrs Map of node attributes.
      * @param ver Product version.
      */
-    public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver);
+    public TcpDiscoverySpi setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver);
 
     /**
      * Sets a listener for discovery events. Refer to
@@ -102,7 +102,7 @@ public interface DiscoverySpi extends IgniteSpi {
      *
      * @param exchange Discovery data exchange handler.
      */
-    public void setDataExchange(DiscoverySpiDataExchange exchange);
+    public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange);
 
     /**
      * Sets discovery metrics provider. Use metrics provided by
@@ -111,7 +111,7 @@ public interface DiscoverySpi extends IgniteSpi {
      *
      * @param metricsProvider Provider of metrics data.
      */
-    public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider);
+    public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider);
 
     /**
      * Tells discovery SPI to disconnect from topology. This is very close to calling
@@ -152,4 +152,11 @@ public interface DiscoverySpi extends IgniteSpi {
      * @param nodeId Node ID.
      */
     public void failNode(UUID nodeId);
+
+    /**
+     * Whether or not discovery is in client mode.
+     *
+     * @return {@code true} if node is in client mode.
+     */
+    public boolean isClientMode();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
new file mode 100644
index 0000000..455b2af
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -0,0 +1,1481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
+
+/**
+ *
+ */
+class ClientImpl extends TcpDiscoveryImpl {
+    /** */
+    private static final Object JOIN_TIMEOUT = "JOIN_TIMEOUT";
+
+    /** */
+    private static final Object SPI_STOP = "SPI_STOP";
+
+    /** */
+    private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
+
+    /** Remote nodes. */
+    private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
+
+    /** Topology history. */
+    private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+
+    /** Remote nodes. */
+    private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>();
+
+    /** Socket writer. */
+    private SocketWriter sockWriter;
+
+    /** */
+    private SocketReader sockReader;
+
+    /** */
+    private boolean segmented;
+
+    /** Last message ID. */
+    private volatile IgniteUuid lastMsgId;
+
+    /** Current topology version. */
+    private volatile long topVer;
+
+    /** Join error. Contains error what occurs on join process. */
+    private IgniteSpiException joinErr;
+
+    /** Joined latch. */
+    private final CountDownLatch joinLatch = new CountDownLatch(1);
+
+    /** Left latch. */
+    private final CountDownLatch leaveLatch = new CountDownLatch(1);
+
+    /** */
+    private final Timer timer = new Timer("TcpDiscoverySpi.timer");
+
+    /** */
+    protected MessageWorker msgWorker;
+
+    /**
+     * @param adapter Adapter.
+     */
+    ClientImpl(TcpDiscoverySpi adapter) {
+        super(adapter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void dumpDebugInfo(IgniteLogger log) {
+        StringBuilder b = new StringBuilder(U.nl());
+
+        b.append(">>>").append(U.nl());
+        b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl());
+        b.append(">>>").append(U.nl());
+
+        b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl());
+        b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl());
+
+        b.append("Internal threads: ").append(U.nl());
+
+        b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
+        b.append("    Socket reader: ").append(threadStatus(sockReader)).append(U.nl());
+        b.append("    Socket writer: ").append(threadStatus(sockWriter)).append(U.nl());
+        b.append("    Socket timeout worker: ").append(threadStatus(adapter.sockTimeoutWorker)).append(U.nl());
+
+        b.append(U.nl());
+
+        b.append("Nodes: ").append(U.nl());
+
+        for (ClusterNode node : allVisibleNodes())
+            b.append("    ").append(node.id()).append(U.nl());
+
+        b.append(U.nl());
+
+        b.append("Stats: ").append(adapter.stats).append(U.nl());
+
+        U.quietAndInfo(log, b.toString());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getSpiState() {
+
+        if (sockWriter.isOnline())
+            return "connected";
+
+        return "disconnected";
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMessageWorkerQueueSize() {
+        return msgWorker.queueSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID getCoordinator() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+        adapter.initLocalNode(0, true);
+
+        locNode = adapter.locNode;
+
+        sockWriter = new SocketWriter();
+        sockWriter.start();
+
+        sockReader = new SocketReader();
+        sockReader.start();
+
+        msgWorker = new MessageWorker();
+        msgWorker.start();
+
+        try {
+            joinLatch.await();
+
+            if (joinErr != null)
+                throw joinErr;
+        }
+        catch (InterruptedException e) {
+            throw new IgniteSpiException("Thread has been interrupted.", e);
+        }
+
+        timer.schedule(new HeartbeatSender(), adapter.hbFreq, adapter.hbFreq);
+
+        adapter.printStartInfo();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStop() throws IgniteSpiException {
+        timer.cancel();
+
+        if (msgWorker != null && msgWorker.isAlive()) { // Should always be alive
+            msgWorker.addMessage(SPI_STOP);
+
+            try {
+                if (!leaveLatch.await(adapter.netTimeout, MILLISECONDS))
+                    U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
+            }
+            catch (InterruptedException ignored) {
+
+            }
+        }
+
+        for (GridFutureAdapter<Boolean> fut : pingFuts.values())
+            fut.onDone(false);
+
+        rmtNodes.clear();
+
+        U.interrupt(msgWorker);
+        U.interrupt(sockWriter);
+        U.interrupt(sockReader);
+
+        U.join(msgWorker, log);
+        U.join(sockWriter, log);
+        U.join(sockReader, log);
+
+        adapter.printStopInfo();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> getRemoteNodes() {
+        return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+        if (getLocalNodeId().equals(nodeId))
+            return locNode;
+
+        TcpDiscoveryNode node = rmtNodes.get(nodeId);
+
+        return node != null && node.visible() ? node : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(@NotNull final UUID nodeId) {
+        if (nodeId.equals(getLocalNodeId()))
+            return true;
+
+        TcpDiscoveryNode node = rmtNodes.get(nodeId);
+
+        if (node == null || !node.visible())
+            return false;
+
+        GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId);
+
+        if (fut == null) {
+            fut = new GridFutureAdapter<>();
+
+            GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut);
+
+            if (oldFut != null)
+                fut = oldFut;
+            else {
+                if (adapter.getSpiContext().isStopping()) {
+                    if (pingFuts.remove(nodeId, fut))
+                        fut.onDone(false);
+
+                    return false;
+                }
+
+                final GridFutureAdapter<Boolean> finalFut = fut;
+
+                timer.schedule(new TimerTask() {
+                    @Override public void run() {
+                        if (pingFuts.remove(nodeId, finalFut))
+                            finalFut.onDone(false);
+                    }
+                }, adapter.netTimeout);
+
+                sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+            }
+        }
+
+        try {
+            return fut.get();
+        }
+        catch (IgniteInterruptedCheckedException ignored) {
+            return false;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException(e); // Should newer occur
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void disconnect() throws IgniteSpiException {
+        U.interrupt(msgWorker);
+        U.interrupt(sockWriter);
+        U.interrupt(sockReader);
+
+        U.join(msgWorker, log);
+        U.join(sockWriter, log);
+        U.join(sockReader, log);
+
+        leaveLatch.countDown();
+        joinLatch.countDown();
+
+        adapter.getSpiContext().deregisterPorts();
+
+        Collection<ClusterNode> rmts = getRemoteNodes();
+
+        // This is restart/disconnection and remote nodes are not empty.
+        // We need to fire FAIL event for each.
+        DiscoverySpiListener lsnr = adapter.lsnr;
+
+        if (lsnr != null) {
+            for (ClusterNode n : rmts) {
+                rmtNodes.remove(n.id());
+
+                Collection<ClusterNode> top = updateTopologyHistory(topVer + 1);
+
+                lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null);
+            }
+        }
+
+        rmtNodes.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
+        if (segmented)
+            throw new IgniteException("Failed to send custom message: client is disconnected");
+
+        try {
+            sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
+                adapter.marsh.marshal(evt)));
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void failNode(UUID nodeId) {
+        ClusterNode node = rmtNodes.get(nodeId);
+
+        if (node != null) {
+            TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
+                node.id(), node.order());
+
+            msgWorker.addMessage(msg);
+        }
+    }
+
+    /**
+     * @return Opened socket or {@code null} if timeout.
+     * @see TcpDiscoverySpi#joinTimeout
+     */
+    @SuppressWarnings("BusyWait")
+    @Nullable private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException {
+        Collection<InetSocketAddress> addrs = null;
+
+        long startTime = U.currentTimeMillis();
+
+        while (true) {
+            if (Thread.currentThread().isInterrupted())
+                throw new InterruptedException();
+
+            while (addrs == null || addrs.isEmpty()) {
+                addrs = adapter.resolvedAddresses();
+
+                if (!F.isEmpty(addrs)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Resolved addresses from IP finder: " + addrs);
+                }
+                else {
+                    U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + adapter.ipFinder);
+
+                    if (adapter.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > adapter.joinTimeout)
+                        return null;
+
+                    Thread.sleep(2000);
+                }
+            }
+
+            Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
+
+            Iterator<InetSocketAddress> it = addrs.iterator();
+
+            while (it.hasNext()) {
+                if (Thread.currentThread().isInterrupted())
+                    throw new InterruptedException();
+
+                InetSocketAddress addr = it.next();
+
+                Socket sock = null;
+
+                try {
+                    long ts = U.currentTimeMillis();
+
+                    IgniteBiTuple<Socket, UUID> t = initConnection(addr);
+
+                    sock = t.get1();
+
+                    UUID rmtNodeId = t.get2();
+
+                    adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
+
+                    locNode.clientRouterNodeId(rmtNodeId);
+
+                    TcpDiscoveryAbstractMessage msg = recon ?
+                        new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId,
+                            lastMsgId) :
+                        new TcpDiscoveryJoinRequestMessage(locNode, adapter.collectExchangeData(getLocalNodeId()));
+
+                    msg.client(true);
+
+                    adapter.writeToSocket(sock, msg);
+
+                    int res = adapter.readReceipt(sock, adapter.ackTimeout);
+
+                    switch (res) {
+                        case RES_OK:
+                            return sock;
+
+                        case RES_CONTINUE_JOIN:
+                        case RES_WAIT:
+                            U.closeQuiet(sock);
+
+                            break;
+
+                        default:
+                            if (log.isDebugEnabled())
+                                log.debug("Received unexpected response to join request: " + res);
+
+                            U.closeQuiet(sock);
+                    }
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (log.isDebugEnabled())
+                        U.error(log, "Failed to establish connection with address: " + addr, e);
+
+                    U.closeQuiet(sock);
+
+                    it.remove();
+                }
+            }
+
+            if (addrs.isEmpty()) {
+                U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
+                    "in 2000ms): " + addrs0);
+
+                if (adapter.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > adapter.joinTimeout)
+                    return null;
+
+                Thread.sleep(2000);
+            }
+        }
+    }
+
+    /**
+     * @param topVer New topology version.
+     * @return Latest topology snapshot.
+     */
+    private NavigableSet<ClusterNode> updateTopologyHistory(long topVer) {
+        this.topVer = topVer;
+
+        NavigableSet<ClusterNode> allNodes = allVisibleNodes();
+
+        if (!topHist.containsKey(topVer)) {
+            assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
+                "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
+
+            topHist.put(topVer, allNodes);
+
+            if (topHist.size() > adapter.topHistSize)
+                topHist.pollFirstEntry();
+
+            assert topHist.lastKey() == topVer;
+            assert topHist.size() <= adapter.topHistSize;
+        }
+
+        return allNodes;
+    }
+
+    /**
+     * @return All nodes.
+     */
+    private NavigableSet<ClusterNode> allVisibleNodes() {
+        NavigableSet<ClusterNode> allNodes = new TreeSet<>();
+
+        for (TcpDiscoveryNode node : rmtNodes.values()) {
+            if (node.visible())
+                allNodes.add(node);
+        }
+
+        allNodes.add(locNode);
+
+        return allNodes;
+    }
+
+    /**
+     * @param addr Address.
+     * @return Remote node ID.
+     * @throws IOException In case of I/O error.
+     * @throws IgniteCheckedException In case of other error.
+     */
+    private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException {
+        assert addr != null;
+
+        Socket sock = adapter.openSocket(addr);
+
+        TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId());
+
+        req.client(true);
+
+        adapter.writeToSocket(sock, req);
+
+        TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, adapter.ackTimeout);
+
+        UUID nodeId = res.creatorNodeId();
+
+        assert nodeId != null;
+        assert !getLocalNodeId().equals(nodeId);
+
+        return F.t(sock, nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override void simulateNodeFailure() {
+        U.warn(log, "Simulating client node failure: " + getLocalNodeId());
+
+        U.interrupt(sockWriter);
+        U.interrupt(msgWorker);
+        U.interrupt(adapter.sockTimeoutWorker);
+
+        U.join(sockWriter, log);
+        U.join(msgWorker, log);
+        U.join(adapter.sockTimeoutWorker, log);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void brakeConnection() {
+        U.closeQuiet(msgWorker.currSock);
+    }
+
+    @Override protected IgniteSpiThread workerThread() {
+        return msgWorker;
+    }
+
+    /**
+     * FOR TEST PURPOSE ONLY!
+     */
+    public void waitForClientMessagePrecessed() {
+        Object last = msgWorker.queue.peekLast();
+
+        while (last != null && msgWorker.isAlive() && msgWorker.queue.contains(last)) {
+            try {
+                Thread.sleep(10);
+            }
+            catch (InterruptedException ignored) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Heartbeat sender.
+     */
+    private class HeartbeatSender extends TimerTask {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            if (!adapter.getSpiContext().isStopping() && sockWriter.isOnline()) {
+                TcpDiscoveryClientHeartbeatMessage msg = new TcpDiscoveryClientHeartbeatMessage(getLocalNodeId(),
+                    adapter.metricsProvider.metrics());
+
+                msg.client(true);
+
+                sockWriter.sendMessage(msg);
+            }
+        }
+    }
+
+    /**
+     * Socket reader.
+     */
+    private class SocketReader extends IgniteSpiThread {
+        /** */
+        private final Object mux = new Object();
+
+        /** */
+        private Socket sock;
+
+        /** */
+        private UUID rmtNodeId;
+
+        /**
+         */
+        protected SocketReader() {
+            super(adapter.ignite().name(), "tcp-client-disco-sock-reader", log);
+        }
+
+        /**
+         * @param sock Socket.
+         * @param rmtNodeId Rmt node id.
+         */
+        public void setSocket(Socket sock, UUID rmtNodeId) {
+            synchronized (mux) {
+                this.sock = sock;
+
+                this.rmtNodeId = rmtNodeId;
+
+                mux.notifyAll();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            while (!isInterrupted()) {
+                Socket sock;
+                UUID rmtNodeId;
+
+                synchronized (mux) {
+                    if (this.sock == null) {
+                        mux.wait();
+
+                        continue;
+                    }
+
+                    sock = this.sock;
+                    rmtNodeId = this.rmtNodeId;
+                }
+
+                try {
+                    InputStream in = new BufferedInputStream(sock.getInputStream());
+
+                    sock.setKeepAlive(true);
+                    sock.setTcpNoDelay(true);
+
+                    while (!isInterrupted()) {
+                        TcpDiscoveryAbstractMessage msg;
+
+                        try {
+                            msg = adapter.marsh.unmarshal(in, U.gridClassLoader());
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (log.isDebugEnabled())
+                                U.error(log, "Failed to read message [sock=" + sock + ", " +
+                                    "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e);
+
+                            IOException ioEx = X.cause(e, IOException.class);
+
+                            if (ioEx != null)
+                                throw ioEx;
+
+                            ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);
+
+                            if (clsNotFoundEx != null)
+                                LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
+                                    "(make sure same versions of all classes are available on all nodes) " +
+                                    "[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
+                            else
+                                LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" +
+                                    getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']');
+
+                            continue;
+                        }
+
+                        msg.senderNodeId(rmtNodeId);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Message has been received: " + msg);
+
+                        adapter.stats.onMessageReceived(msg);
+
+                        if (adapter.ensured(msg))
+                            lastMsgId = msg.id();
+
+                        msgWorker.addMessage(msg);
+                    }
+                }
+                catch (IOException e) {
+                    msgWorker.addMessage(new SocketClosedMessage(sock));
+
+                    if (log.isDebugEnabled())
+                        U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e);
+                }
+                finally {
+                    U.closeQuiet(sock);
+
+                    synchronized (mux) {
+                        if (this.sock == sock) {
+                            this.sock = null;
+                            this.rmtNodeId = null;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private class SocketWriter extends IgniteSpiThread {
+        /** */
+        private final Object mux = new Object();
+
+        /** */
+        private Socket sock;
+
+        /** */
+        private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
+
+        /**
+         *
+         */
+        protected SocketWriter() {
+            super(adapter.ignite().name(), "tcp-client-disco-sock-writer", log);
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void sendMessage(TcpDiscoveryAbstractMessage msg) {
+            synchronized (mux) {
+                queue.add(msg);
+
+                mux.notifyAll();
+            }
+        }
+
+        /**
+         * @param sock Socket.
+         */
+        private void setSocket(Socket sock) {
+            synchronized (mux) {
+                this.sock = sock;
+
+                mux.notifyAll();
+            }
+        }
+
+        /**
+         *
+         */
+        public boolean isOnline() {
+            synchronized (mux) {
+                return sock != null;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            TcpDiscoveryAbstractMessage msg = null;
+
+            while (!Thread.currentThread().isInterrupted()) {
+                Socket sock;
+
+                synchronized (mux) {
+                    sock = this.sock;
+
+                    if (sock == null) {
+                        mux.wait();
+
+                        continue;
+                    }
+
+                    if (msg == null)
+                        msg = queue.poll();
+
+                    if (msg == null) {
+                        mux.wait();
+
+                        continue;
+                    }
+                }
+
+                for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : adapter.sendMsgLsnrs)
+                    msgLsnr.apply(msg);
+
+                try {
+                    adapter.writeToSocket(sock, msg);
+
+                    msg = null;
+                }
+                catch (IOException e) {
+                    if (log.isDebugEnabled())
+                        U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock + ']', e);
+
+                    U.closeQuiet(sock);
+
+                    synchronized (mux) {
+                        if (sock == this.sock)
+                            this.sock = null; // Connection has dead.
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message: " + msg, e);
+
+                    msg = null;
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private class Reconnector extends IgniteSpiThread {
+        /** */
+        private volatile Socket sock;
+
+        /**
+         *
+         */
+        protected Reconnector() {
+            super(adapter.ignite().name(), "tcp-client-disco-msg-worker", log);
+        }
+
+        /**
+         *
+         */
+        public void cancel() {
+            interrupt();
+
+            U.closeQuiet(sock);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            assert !segmented;
+
+            boolean success = false;
+
+            try {
+                sock = joinTopology(true);
+
+                if (sock == null) {
+                    U.error(log, "Failed to reconnect to cluster: timeout.");
+
+                    return;
+                }
+
+                if (isInterrupted())
+                    throw new InterruptedException();
+
+                InputStream in = new BufferedInputStream(sock.getInputStream());
+
+                sock.setKeepAlive(true);
+                sock.setTcpNoDelay(true);
+
+                // Wait for
+                while (!isInterrupted()) {
+                    TcpDiscoveryAbstractMessage msg = adapter.marsh.unmarshal(in, U.gridClassLoader());
+
+                    if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+                        TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
+
+                        if (res.creatorNodeId().equals(getLocalNodeId())) {
+                            if (res.success()) {
+                                msgWorker.addMessage(res);
+
+                                success = true;
+                            }
+
+                            break;
+                        }
+                    }
+
+                }
+            }
+            catch (IOException | IgniteCheckedException e) {
+                U.error(log, "Failed to reconnect", e);
+            }
+            finally {
+                if (!success) {
+                    U.closeQuiet(sock);
+
+                    msgWorker.addMessage(SPI_RECONNECT_FAILED);
+                }
+            }
+        }
+    }
+
+    /**
+     * Message worker.
+     */
+    protected class MessageWorker extends IgniteSpiThread {
+        /** Message queue. */
+        private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
+
+        /** */
+        private Socket currSock;
+
+        /** Indicates that pending messages are currently processed. */
+        private boolean pending;
+
+        /** */
+        private Reconnector reconnector;
+
+        /**
+         *
+         */
+        private MessageWorker() {
+            super(adapter.ignite().name(), "tcp-client-disco-msg-worker", log);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("InfiniteLoopStatement")
+        @Override protected void body() throws InterruptedException {
+            adapter.stats.onJoinStarted();
+
+            try {
+                final Socket sock = joinTopology(false);
+
+                if (sock == null) {
+                    joinErr = new IgniteSpiException("Join process timed out");
+
+                    joinLatch.countDown();
+
+                    return;
+                }
+
+                currSock = sock;
+
+                sockWriter.setSocket(sock);
+
+                timer.schedule(new TimerTask() {
+                    @Override public void run() {
+                        if (joinLatch.getCount() > 0)
+                            queue.add(JOIN_TIMEOUT);
+                    }
+                }, adapter.netTimeout);
+
+                sockReader.setSocket(sock, locNode.clientRouterNodeId());
+
+                while (true) {
+                    Object msg = queue.take();
+
+                    if (msg == JOIN_TIMEOUT) {
+                        if (joinLatch.getCount() > 0) {
+                            joinErr = new IgniteSpiException("Join process timed out [sock=" + sock +
+                                ", timeout=" + adapter.netTimeout + ']');
+
+                            joinLatch.countDown();
+
+                            break;
+                        }
+                    }
+                    else if (msg == SPI_STOP) {
+                        assert adapter.getSpiContext().isStopping();
+
+                        if (currSock != null) {
+                            TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
+
+                            leftMsg.client(true);
+
+                            sockWriter.sendMessage(leftMsg);
+                        }
+                        else
+                            leaveLatch.countDown();
+                    }
+                    else if (msg instanceof SocketClosedMessage) {
+                        if (((SocketClosedMessage)msg).sock == currSock) {
+                            currSock = null;
+
+                            if (joinLatch.getCount() > 0) {
+                                joinErr = new IgniteSpiException("Failed to connect to cluster: socket closed.");
+
+                                joinLatch.countDown();
+
+                                break;
+                            }
+                            else {
+                                if (adapter.getSpiContext().isStopping() || segmented)
+                                    leaveLatch.countDown();
+                                else {
+                                    assert reconnector == null;
+
+                                    final Reconnector reconnector = new Reconnector();
+                                    this.reconnector = reconnector;
+                                    reconnector.start();
+
+                                    timer.schedule(new TimerTask() {
+                                        @Override public void run() {
+                                            if (reconnector.isAlive())
+                                                reconnector.cancel();
+                                        }
+                                    }, adapter.netTimeout);
+                                }
+                            }
+                        }
+                    }
+                    else if (msg == SPI_RECONNECT_FAILED) {
+                        if (!segmented) {
+                            segmented = true;
+
+                            reconnector.cancel();
+                            reconnector.join();
+
+                            notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                        }
+                    }
+                    else {
+                        TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
+
+                        if (joinLatch.getCount() > 0) {
+                            IgniteSpiException err = null;
+
+                            if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
+                                err = adapter.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
+                            else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage)
+                                err = adapter.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
+                            else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage)
+                                err = adapter.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
+
+                            if (err != null) {
+                                joinErr = err;
+
+                                joinLatch.countDown();
+
+                                break;
+                            }
+                        }
+
+                        processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg);
+                    }
+                }
+            }
+            finally {
+                U.closeQuiet(currSock);
+
+                if (joinLatch.getCount() > 0) {
+                    // This should not occurs.
+                    joinErr = new IgniteSpiException("Some error occurs in joinig process");
+
+                    joinLatch.countDown();
+                }
+
+                if (reconnector != null) {
+                    reconnector.cancel();
+
+                    reconnector.join();
+                }
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
+            assert msg != null;
+            assert msg.verified() || msg.senderNodeId() == null;
+
+            adapter.stats.onMessageProcessingStarted(msg);
+
+            if (msg instanceof TcpDiscoveryNodeAddedMessage)
+                processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+            else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+                processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
+            else if (msg instanceof TcpDiscoveryNodeLeftMessage)
+                processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
+            else if (msg instanceof TcpDiscoveryNodeFailedMessage)
+                processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
+            else if (msg instanceof TcpDiscoveryHeartbeatMessage)
+                processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+            else if (msg instanceof TcpDiscoveryClientReconnectMessage)
+                processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+            else if (msg instanceof TcpDiscoveryCustomEventMessage)
+                processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+            else if (msg instanceof TcpDiscoveryClientPingResponse)
+                processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
+            else if (msg instanceof TcpDiscoveryPingRequest)
+                processPingRequest();
+
+            adapter.stats.onMessageProcessingFinished(msg);
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
+            if (adapter.getSpiContext().isStopping())
+                return;
+
+            TcpDiscoveryNode node = msg.node();
+
+            UUID newNodeId = node.id();
+
+            if (getLocalNodeId().equals(newNodeId)) {
+                if (joinLatch.getCount() > 0) {
+                    Collection<TcpDiscoveryNode> top = msg.topology();
+
+                    if (top != null) {
+                        adapter.gridStartTime = msg.gridStartTime();
+
+                        for (TcpDiscoveryNode n : top) {
+                            if (n.order() > 0)
+                                n.visible(true);
+
+                            rmtNodes.put(n.id(), n);
+                        }
+
+                        topHist.clear();
+
+                        if (msg.topologyHistory() != null)
+                            topHist.putAll(msg.topologyHistory());
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Discarding node added message with empty topology: " + msg);
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Discarding node added message (this message has already been processed) " +
+                        "[msg=" + msg + ", locNode=" + locNode + ']');
+            }
+            else {
+                boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
+
+                if (topChanged) {
+                    if (log.isDebugEnabled())
+                        log.debug("Added new node to topology: " + node);
+
+                    Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+
+                    if (data != null)
+                        adapter.onExchange(newNodeId, newNodeId, data, null);
+                }
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+            if (adapter.getSpiContext().isStopping())
+                return;
+
+            if (getLocalNodeId().equals(msg.nodeId())) {
+                if (joinLatch.getCount() > 0) {
+                    Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
+
+                    if (dataMap != null) {
+                        for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
+                            adapter.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null);
+                    }
+
+                    locNode.setAttributes(msg.clientNodeAttributes());
+                    locNode.visible(true);
+
+                    long topVer = msg.topologyVersion();
+
+                    locNode.order(topVer);
+
+                    notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer));
+
+                    joinErr = null;
+
+                    joinLatch.countDown();
+
+                    adapter.stats.onJoinFinished();
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Discarding node add finished message (this message has already been processed) " +
+                        "[msg=" + msg + ", locNode=" + locNode + ']');
+            }
+            else {
+                TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
+
+                if (node == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');
+
+                    return;
+                }
+
+                long topVer = msg.topologyVersion();
+
+                node.order(topVer);
+                node.visible(true);
+
+                if (adapter.locNodeVer.equals(node.version()))
+                    node.version(adapter.locNodeVer);
+
+                NavigableSet<ClusterNode> top = updateTopologyHistory(topVer);
+
+                if (!pending && joinLatch.getCount() > 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node add finished message (join process is not finished): " + msg);
+
+                    return;
+                }
+
+                notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
+
+                adapter.stats.onNodeJoined();
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
+            if (getLocalNodeId().equals(msg.creatorNodeId())) {
+                if (log.isDebugEnabled())
+                    log.debug("Received node left message for local node: " + msg);
+
+                leaveLatch.countDown();
+            }
+            else {
+                if (adapter.getSpiContext().isStopping())
+                    return;
+
+                TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
+
+                if (node == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node left message since node is not found [msg=" + msg + ']');
+
+                    return;
+                }
+
+                NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+
+                if (!pending && joinLatch.getCount() > 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node left message (join process is not finished): " + msg);
+
+                    return;
+                }
+
+                notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
+
+                adapter.stats.onNodeLeft();
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
+            if (adapter.getSpiContext().isStopping()) {
+                if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) {
+                    if (leaveLatch.getCount() > 0) {
+                        log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId()
+                            + ", rmtNode=" + msg.creatorNodeId() + ']');
+
+                        leaveLatch.countDown();
+                    }
+                }
+
+                return;
+            }
+
+            if (!getLocalNodeId().equals(msg.creatorNodeId())) {
+                TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
+
+                if (node == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');
+
+                    return;
+                }
+
+                NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+
+                if (!pending && joinLatch.getCount() > 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node failed message (join process is not finished): " + msg);
+
+                    return;
+                }
+
+                notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
+
+                adapter.stats.onNodeFailed();
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
+            if (adapter.getSpiContext().isStopping())
+                return;
+
+            if (getLocalNodeId().equals(msg.creatorNodeId())) {
+                assert msg.senderNodeId() != null;
+
+                if (log.isDebugEnabled())
+                    log.debug("Received heartbeat response: " + msg);
+            }
+            else {
+                long tstamp = U.currentTimeMillis();
+
+                if (msg.hasMetrics()) {
+                    for (Map.Entry<UUID, TcpDiscoveryHeartbeatMessage.MetricsSet> e : msg.metrics().entrySet()) {
+                        UUID nodeId = e.getKey();
+
+                        TcpDiscoveryHeartbeatMessage.MetricsSet metricsSet = e.getValue();
+
+                        Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ?
+                            msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
+
+                        updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp);
+
+                        for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
+                            updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp);
+                    }
+                }
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+            if (adapter.getSpiContext().isStopping())
+                return;
+
+            if (getLocalNodeId().equals(msg.creatorNodeId())) {
+                assert msg.success();
+
+                currSock = reconnector.sock;
+
+                sockWriter.setSocket(currSock);
+                sockReader.setSocket(currSock, locNode.clientRouterNodeId());
+
+                reconnector = null;
+
+                pending = true;
+
+                try {
+                    for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
+                        processDiscoveryMessage(pendingMsg);
+                }
+                finally {
+                    pending = false;
+                }
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Discarding reconnect message for another client: " + msg);
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+            if (msg.verified() && joinLatch.getCount() == 0) {
+                DiscoverySpiListener lsnr = adapter.lsnr;
+
+                if (lsnr != null) {
+                    UUID nodeId = msg.creatorNodeId();
+
+                    TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
+
+                    if (node != null && node.visible()) {
+                        try {
+                            DiscoverySpiCustomMessage msgObj = msg.message(adapter.marsh);
+
+                            notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
+                        }
+                        catch (Throwable e) {
+                            U.error(log, "Failed to unmarshal discovery custom message.", e);
+                        }
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Received metrics from unknown node: " + nodeId);
+                }
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) {
+            GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing());
+
+            if (fut != null)
+                fut.onDone(msg.result());
+        }
+
+        /**
+         * Router want to ping this client.
+         */
+        private void processPingRequest() {
+            TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(getLocalNodeId());
+
+            res.client(true);
+
+            sockWriter.sendMessage(res);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param metrics Metrics.
+         * @param cacheMetrics Cache metrics.
+         * @param tstamp Timestamp.
+         */
+        private void updateMetrics(UUID nodeId,
+            ClusterMetrics metrics,
+            Map<Integer, CacheMetrics> cacheMetrics,
+            long tstamp)
+        {
+            assert nodeId != null;
+            assert metrics != null;
+            assert cacheMetrics != null;
+
+            TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
+
+            if (node != null && node.visible()) {
+                node.setMetrics(metrics);
+                node.setCacheMetrics(cacheMetrics);
+
+                node.lastUpdateTime(tstamp);
+
+                notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes());
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Received metrics from unknown node: " + nodeId);
+        }
+
+        /**
+         * @param type Event type.
+         * @param topVer Topology version.
+         * @param node Node.
+         * @param top Topology snapshot.
+         */
+        private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top) {
+            notifyDiscovery(type, topVer, node, top, null);
+        }
+
+        /**
+         * @param type Event type.
+         * @param topVer Topology version.
+         * @param node Node.
+         * @param top Topology snapshot.
+         */
+        private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top,
+            @Nullable DiscoverySpiCustomMessage data) {
+            DiscoverySpiListener lsnr = adapter.lsnr;
+
+            if (lsnr != null) {
+                if (log.isDebugEnabled())
+                    log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
+                        ", topVer=" + topVer + ']');
+
+                lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), data);
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
+                    ", topVer=" + topVer + ']');
+        }
+
+        /**
+         * @param msg Message.
+         */
+        public void addMessage(Object msg) {
+            queue.add(msg);
+        }
+
+        /**
+         *
+         */
+        public int queueSize() {
+            return queue.size();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class SocketClosedMessage {
+        /** */
+        private final Socket sock;
+
+        /**
+         * @param sock Socket.
+         */
+        private SocketClosedMessage(Socket sock) {
+            this.sock = sock;
+        }
+    }
+}