You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 15:07:39 UTC

[38/52] [abbrv] incubator-ignite git commit: # Renaming

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
new file mode 100644
index 0000000..955a305
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
@@ -0,0 +1,36 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.util.*;
+
+/**
+ * Handler for initial data exchange between GridGain nodes. Data exchange
+ * is initiated by a new node when it tries to join topology and finishes
+ * before it actually joins.
+ */
+public interface DiscoverySpiDataExchange {
+    /**
+     * Collects data from all components. This method is called both
+     * on new node that joins topology to transfer its data to existing
+     * nodes and on all existing nodes to transfer their data to new node.
+     *
+     * @param nodeId ID of new node that joins topology.
+     * @return Collection of discovery data objects from different components.
+     */
+    public List<Object> collect(UUID nodeId);
+
+    /**
+     * Notifies discovery manager about data received from remote node.
+     *
+     * @param data Collection of discovery data objects from different components.
+     */
+    public void onExchange(List<Object> data);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java
new file mode 100644
index 0000000..cd6ecf9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java
@@ -0,0 +1,28 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.lang.annotation.*;
+
+/**
+ * This annotation is for all implementations of {@link DiscoverySpi} that support
+ * topology snapshots history.
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface DiscoverySpiHistorySupport {
+    /**
+     * Whether or not target SPI supports topology snapshots history.
+     */
+    @SuppressWarnings({"JavaDoc"})
+    public boolean value();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
new file mode 100644
index 0000000..acc0ee1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
@@ -0,0 +1,35 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import org.apache.ignite.cluster.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Listener for grid node discovery events. See
+ * {@link DiscoverySpi} for information on how grid nodes get discovered.
+ */
+public interface DiscoverySpiListener {
+    /**
+     * Notification for grid node discovery events.
+     *
+     * @param type Node discovery event type. See {@link org.apache.ignite.events.IgniteDiscoveryEvent}
+     * @param topVer Topology version or {@code 0} if configured discovery SPI implementation
+     *      does not support versioning.
+     * @param node Node affected (e.g. newly joined node, left node, failed node or local node).
+     * @param topSnapshot Topology snapshot after event has been occurred (e.g. if event is
+     *      {@code EVT_NODE_JOINED}, then joined node will be in snapshot).
+     * @param topHist Topology snapshots history.
+     */
+    public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot,
+        @Nullable Map<Long, Collection<ClusterNode>> topHist);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java
new file mode 100644
index 0000000..dd9fc3a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java
@@ -0,0 +1,39 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.managers.security.*;
+import org.gridgain.grid.security.*;
+
+/**
+ * Node authenticator.
+ */
+public interface DiscoverySpiNodeAuthenticator {
+    /**
+     * Security credentials.
+     *
+     * @param node Node to authenticate.
+     * @param cred Security credentials.
+     * @return Security context if authentication succeeded or {@code null} if authentication failed.
+     * @throws GridException If authentication process failed
+     *      (invalid credentials should not lead to this exception).
+     */
+    public GridSecurityContext authenticateNode(ClusterNode node, GridSecurityCredentials cred) throws GridException;
+
+    /**
+     * Gets global node authentication flag.
+     *
+     * @return {@code True} if all nodes in topology should authenticate joining node, {@code false} if only
+     *      coordinator should do the authentication.
+     */
+    public boolean isGlobalNodeAuthentication();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java
new file mode 100644
index 0000000..aee8b3f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java
@@ -0,0 +1,39 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.lang.annotation.*;
+
+/**
+ * This annotation is for all implementations of {@link DiscoverySpi} that support
+ * proper node ordering. This includes:
+ * <ul>
+ * <li>
+ * Every node gets an order number assigned to it which is provided via {@link org.apache.ignite.cluster.ClusterNode#order()}
+ * method. There is no requirement about order value other than that nodes that join grid
+ * at later point of time have order values greater than previous nodes.
+ * </li>
+ * <li>
+ * All {@link org.apache.ignite.events.IgniteEventType#EVT_NODE_JOINED} events come in proper order. This means that all
+ * listeners to discovery events will receive discovery notifications in proper order.
+ * </li>
+ * </ul>
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface DiscoverySpiOrderSupport {
+    /**
+     * Whether or not target SPI supports node startup order.
+     */
+    @SuppressWarnings({"JavaDoc"})
+    public boolean value();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html b/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html
new file mode 100644
index 0000000..77f45f1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html
@@ -0,0 +1,15 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<!--
+    @html.file.header
+    _________        _____ __________________        _____
+    __  ____/___________(_)______  /__  ____/______ ____(_)_______
+    _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+    / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+    \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+-->
+<html>
+<body>
+    <!-- Package description. -->
+    Contains APIs for topology manager SPI.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
new file mode 100644
index 0000000..b474852
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -0,0 +1,1219 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*;
+
+/**
+ * Client discovery SPI implementation that uses TCP/IP for node discovery.
+ * <p>
+ * This discovery SPI requires at least on server node configured with
+ * {@link TcpDiscoverySpi}. It will try to connect to random IP taken from
+ * {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} which should point to one of these server
+ * nodes and will maintain connection only with this node (will not enter the ring).
+ * If this connection is broken, it will try to reconnect using addresses from
+ * the same IP finder.
+ */
+@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+@IgniteSpiMultipleInstancesSupport(true)
+@DiscoverySpiOrderSupport(true)
+@DiscoverySpiHistorySupport(true)
+public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpClientDiscoverySpiMBean {
+    /** Default disconnect check interval. */
+    public static final long DFLT_DISCONNECT_CHECK_INT = 2000;
+
+    /** Remote nodes. */
+    private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
+
+    /** Socket. */
+    private volatile Socket sock;
+
+    /** Socket reader. */
+    private volatile SocketReader sockRdr;
+
+    /** Heartbeat sender. */
+    private volatile HeartbeatSender hbSender;
+
+    /** Disconnect handler. */
+    private volatile DisconnectHandler disconnectHnd;
+
+    /** Last message ID. */
+    private volatile IgniteUuid lastMsgId;
+
+    /** Current topology version. */
+    private volatile long topVer;
+
+    /** Join error. */
+    private IgniteSpiException joinErr;
+
+    /** Whether reconnect failed. */
+    private boolean reconFailed;
+
+    /** Joined latch. */
+    private CountDownLatch joinLatch;
+
+    /** Left latch. */
+    private volatile CountDownLatch leaveLatch;
+
+    /** Disconnect check interval. */
+    private long disconnectCheckInt = DFLT_DISCONNECT_CHECK_INT;
+
+    /** {@inheritDoc} */
+    @Override public long getDisconnectCheckInterval() {
+        return disconnectCheckInt;
+    }
+
+    /**
+     * Sets disconnect check interval.
+     *
+     * @param disconnectCheckInt Disconnect check interval.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setDisconnectCheckInterval(long disconnectCheckInt) {
+        this.disconnectCheckInt = disconnectCheckInt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSocketTimeout() {
+        return sockTimeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getAckTimeout() {
+        return ackTimeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNetworkTimeout() {
+        return netTimeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getThreadPriority() {
+        return threadPri;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getHeartbeatFrequency() {
+        return hbFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getIpFinderFormatted() {
+        return ipFinder.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMessageWorkerQueueSize() {
+        SocketReader sockRdr0 = sockRdr;
+
+        return sockRdr0 != null ? sockRdr0.msgWrk.queueSize() : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNodesJoined() {
+        return stats.joinedNodesCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNodesLeft() {
+        return stats.leftNodesCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNodesFailed() {
+        return stats.failedNodesCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getAvgMessageProcessingTime() {
+        return stats.avgMessageProcessingTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMaxMessageProcessingTime() {
+        return stats.maxMessageProcessingTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getTotalReceivedMessages() {
+        return stats.totalReceivedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Integer> getReceivedMessages() {
+        return stats.receivedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getTotalProcessedMessages() {
+        return stats.totalProcessedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Integer> getProcessedMessages() {
+        return stats.processedMessages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+        startStopwatch();
+
+        assertParameter(ipFinder != null, "ipFinder != null");
+        assertParameter(netTimeout > 0, "networkTimeout > 0");
+        assertParameter(sockTimeout > 0, "sockTimeout > 0");
+        assertParameter(ackTimeout > 0, "ackTimeout > 0");
+        assertParameter(hbFreq > 0, "heartbeatFreq > 0");
+        assertParameter(threadPri > 0, "threadPri > 0");
+
+        try {
+            locHost = U.resolveLocalHost(locAddr);
+        }
+        catch (IOException e) {
+            throw new IgniteSpiException("Unknown local address: " + locAddr, e);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug(configInfo("localHost", locHost.getHostAddress()));
+            log.debug(configInfo("threadPri", threadPri));
+            log.debug(configInfo("networkTimeout", netTimeout));
+            log.debug(configInfo("sockTimeout", sockTimeout));
+            log.debug(configInfo("ackTimeout", ackTimeout));
+            log.debug(configInfo("ipFinder", ipFinder));
+            log.debug(configInfo("heartbeatFreq", hbFreq));
+        }
+
+        // Warn on odd network timeout.
+        if (netTimeout < 3000)
+            U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
+
+        // Warn on odd heartbeat frequency.
+        if (hbFreq < 2000)
+            U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq);
+
+        registerMBean(gridName, this, TcpClientDiscoverySpiMBean.class);
+
+        try {
+            locHost = U.resolveLocalHost(locAddr);
+        }
+        catch (IOException e) {
+            throw new IgniteSpiException("Unknown local address: " + locAddr, e);
+        }
+
+        if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
+            TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder);
+
+            if (mcastIpFinder.getLocalAddress() == null)
+                mcastIpFinder.setLocalAddress(locAddr);
+        }
+
+        IgniteBiTuple<Collection<String>, Collection<String>> addrs;
+
+        try {
+            addrs = U.resolveLocalAddresses(locHost);
+        }
+        catch (IOException | GridException e) {
+            throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e);
+        }
+
+        locNode = new TcpDiscoveryNode(
+            locNodeId,
+            addrs.get1(),
+            addrs.get2(),
+            0,
+            metricsProvider,
+            locNodeVer);
+
+        locNode.setAttributes(locNodeAttrs);
+        locNode.local(true);
+
+        sockTimeoutWorker = new SocketTimeoutWorker();
+        sockTimeoutWorker.start();
+
+        joinTopology(false);
+
+        disconnectHnd = new DisconnectHandler();
+        disconnectHnd.start();
+
+        if (log.isDebugEnabled())
+            log.debug(startInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStop() throws IgniteSpiException {
+        rmtNodes.clear();
+
+        U.interrupt(disconnectHnd);
+        U.join(disconnectHnd, log);
+
+        U.interrupt(hbSender);
+        U.join(hbSender, log);
+
+        Socket sock0 = sock;
+
+        sock = null;
+
+        if (sock0 != null) {
+            leaveLatch = new CountDownLatch(1);
+
+            try {
+                TcpDiscoveryNodeLeftMessage msg = new TcpDiscoveryNodeLeftMessage(locNodeId);
+
+                msg.client(true);
+
+                writeToSocket(sock0, msg);
+
+                if (!U.await(leaveLatch, netTimeout, MILLISECONDS)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Did not receive node left message for local node (will stop anyway) [sock=" +
+                            sock0 + ']');
+                }
+            }
+            catch (IOException | GridException e) {
+                if (log.isDebugEnabled())
+                    U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock0 + ']', e);
+            }
+            finally {
+                U.closeQuiet(sock0);
+            }
+        }
+
+        U.interrupt(sockRdr);
+        U.join(sockRdr, log);
+
+        U.interrupt(sockTimeoutWorker);
+        U.join(sockTimeoutWorker, log);
+
+        unregisterMBean();
+
+        if (log.isDebugEnabled())
+            log.debug(stopInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Object> injectables() {
+        return Arrays.<Object>asList(ipFinder);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> getRemoteNodes() {
+        return F.view(U.<TcpDiscoveryNode, ClusterNode>arrayList(rmtNodes.values(), new P1<TcpDiscoveryNode>() {
+            @Override public boolean apply(TcpDiscoveryNode node) {
+                return node.visible();
+            }
+        }));
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+        if (locNodeId.equals(nodeId))
+            return locNode;
+
+        TcpDiscoveryNode node = rmtNodes.get(nodeId);
+
+        return node != null && node.visible() ? node : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        assert nodeId != null;
+
+        if (nodeId.equals(locNodeId))
+            return true;
+
+        TcpDiscoveryNode node = rmtNodes.get(nodeId);
+
+        return node != null && node.visible();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void disconnect() throws IgniteSpiException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
+        // No-op.
+    }
+
+    /**
+     * @param recon Reconnect flag.
+     * @return Whether joined successfully.
+     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     */
+    private boolean joinTopology(boolean recon) throws IgniteSpiException {
+        if (!recon)
+            stats.onJoinStarted();
+
+        Collection<InetSocketAddress> addrs = null;
+
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                while (addrs == null || addrs.isEmpty()) {
+                    addrs = resolvedAddresses();
+
+                    if (!F.isEmpty(addrs)) {
+                        if (log.isDebugEnabled())
+                            log.debug("Resolved addresses from IP finder: " + addrs);
+                    }
+                    else {
+                        U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + ipFinder);
+
+                        U.sleep(2000);
+                    }
+                }
+
+                Iterator<InetSocketAddress> it = addrs.iterator();
+
+                while (it.hasNext() && !Thread.currentThread().isInterrupted()) {
+                    InetSocketAddress addr = it.next();
+
+                    Socket sock = null;
+
+                    try {
+                        long ts = U.currentTimeMillis();
+
+                        IgniteBiTuple<Socket, UUID> t = initConnection(addr);
+
+                        sock = t.get1();
+
+                        UUID rmtNodeId = t.get2();
+
+                        stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
+
+                        locNode.clientRouterNodeId(rmtNodeId);
+
+                        TcpDiscoveryAbstractMessage msg = recon ?
+                            new TcpDiscoveryClientReconnectMessage(locNodeId, rmtNodeId, lastMsgId) :
+                            new TcpDiscoveryJoinRequestMessage(locNode, null);
+
+                        msg.client(true);
+
+                        writeToSocket(sock, msg);
+
+                        int res = readReceipt(sock, ackTimeout);
+
+                        switch (res) {
+                            case RES_OK:
+                                this.sock = sock;
+
+                                sockRdr = new SocketReader(rmtNodeId, new MessageWorker(recon));
+                                sockRdr.start();
+
+                                if (U.await(joinLatch, netTimeout, MILLISECONDS)) {
+                                    IgniteSpiException joinErr0 = joinErr;
+
+                                    if (joinErr0 != null)
+                                        throw joinErr0;
+
+                                    if (reconFailed) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to reconnect, will try to rejoin [locNode=" +
+                                                locNode + ']');
+
+                                        U.closeQuiet(sock);
+
+                                        U.interrupt(sockRdr);
+                                        U.join(sockRdr, log);
+
+                                        this.sock = null;
+
+                                        return false;
+                                    }
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Successfully connected to topology [sock=" + sock + ']');
+
+                                    hbSender = new HeartbeatSender();
+                                    hbSender.start();
+
+                                    stats.onJoinFinished();
+
+                                    return true;
+                                }
+                                else {
+                                    U.warn(log, "Join process timed out (will try other address) [sock=" + sock +
+                                        ", timeout=" + netTimeout + ']');
+
+                                    U.closeQuiet(sock);
+
+                                    U.interrupt(sockRdr);
+                                    U.join(sockRdr, log);
+
+                                    it.remove();
+
+                                    break;
+                                }
+
+                            case RES_CONTINUE_JOIN:
+                            case RES_WAIT:
+                                U.closeQuiet(sock);
+
+                                break;
+
+                            default:
+                                if (log.isDebugEnabled())
+                                    log.debug("Received unexpected response to join request: " + res);
+
+                                U.closeQuiet(sock);
+                        }
+                    }
+                    catch (GridInterruptedException ignored) {
+                        if (log.isDebugEnabled())
+                            log.debug("Joining thread was interrupted.");
+
+                        return false;
+                    }
+                    catch (IOException | GridException e) {
+                        if (log.isDebugEnabled())
+                            U.error(log, "Failed to establish connection with address: " + addr, e);
+
+                        U.closeQuiet(sock);
+
+                        it.remove();
+                    }
+                }
+
+                if (addrs.isEmpty()) {
+                    U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
+                        "in 2000ms): " + addrs);
+
+                    U.sleep(2000);
+                }
+            }
+            catch (GridInterruptedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Joining thread was interrupted.");
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param addr Address.
+     * @return Remote node ID.
+     * @throws IOException In case of I/O error.
+     * @throws GridException In case of other error.
+     */
+    private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, GridException {
+        assert addr != null;
+
+        joinLatch = new CountDownLatch(1);
+
+        Socket sock = openSocket(addr);
+
+        TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
+
+        req.client(true);
+
+        writeToSocket(sock, req);
+
+        TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout);
+
+        UUID nodeId = res.creatorNodeId();
+
+        assert nodeId != null;
+        assert !locNodeId.equals(nodeId);
+
+        return F.t(sock, nodeId);
+    }
+
+    /**
+     * FOR TEST PURPOSE ONLY!
+     */
+    void simulateNodeFailure() {
+        U.warn(log, "Simulating client node failure: " + locNodeId);
+
+        U.closeQuiet(sock);
+
+        U.interrupt(disconnectHnd);
+        U.join(disconnectHnd, log);
+
+        U.interrupt(hbSender);
+        U.join(hbSender, log);
+
+        U.interrupt(sockRdr);
+        U.join(sockRdr, log);
+
+        U.interrupt(sockTimeoutWorker);
+        U.join(sockTimeoutWorker, log);
+    }
+
+    /**
+     * Disconnect handler.
+     */
+    private class DisconnectHandler extends IgniteSpiThread {
+        /**
+         */
+        protected DisconnectHandler() {
+            super(gridName, "tcp-client-disco-disconnect-hnd", log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            while (!isInterrupted()) {
+                try {
+                    U.sleep(disconnectCheckInt);
+
+                    if (sock == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Node is disconnected from topology, will try to reconnect.");
+
+                        U.interrupt(hbSender);
+                        U.join(hbSender, log);
+
+                        U.interrupt(sockRdr);
+                        U.join(sockRdr, log);
+
+                        // If reconnection fails, try to rejoin.
+                        if (!joinTopology(true)) {
+                            rmtNodes.clear();
+
+                            locNode.order(0);
+
+                            joinTopology(false);
+
+                            getSpiContext().recordEvent(new IgniteDiscoveryEvent(locNode,
+                                "Client node reconnected: " + locNode,
+                                EVT_CLIENT_NODE_RECONNECTED, locNode));
+                        }
+                    }
+                }
+                catch (GridInterruptedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Disconnect handler was interrupted.");
+
+                    return;
+                }
+                catch (IgniteSpiException e) {
+                    U.error(log, "Failed to reconnect to topology after failure.", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Heartbeat sender.
+     */
+    private class HeartbeatSender extends IgniteSpiThread {
+        /**
+         */
+        protected HeartbeatSender() {
+            super(gridName, "tcp-client-disco-heartbeat-sender", log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            Socket sock0 = sock;
+
+            if (sock0 == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to start heartbeat sender, node is already disconnected.");
+
+                return;
+            }
+
+            try {
+                while (!isInterrupted()) {
+                    U.sleep(hbFreq);
+
+                    TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(locNodeId);
+
+                    msg.client(true);
+
+                    sockRdr.addMessage(msg);
+                }
+            }
+            catch (GridInterruptedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Heartbeat sender was interrupted.");
+            }
+        }
+    }
+
+    /**
+     * Socket reader.
+     */
+    private class SocketReader extends IgniteSpiThread {
+        /** Remote node ID. */
+        private final UUID nodeId;
+
+        /** Message worker. */
+        private final MessageWorker msgWrk;
+
+        /**
+         * @param nodeId Node ID.
+         * @param msgWrk Message worker.
+         */
+        protected SocketReader(UUID nodeId, MessageWorker msgWrk) {
+            super(gridName, "tcp-client-disco-sock-reader", log);
+
+            assert nodeId != null;
+            assert msgWrk != null;
+
+            this.nodeId = nodeId;
+            this.msgWrk = msgWrk;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void start() {
+            super.start();
+
+            msgWrk.start();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            Socket sock0 = sock;
+
+            if (sock0 == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to start socket reader, node is already disconnected.");
+
+                return;
+            }
+
+            try {
+                InputStream in = new BufferedInputStream(sock0.getInputStream());
+
+                sock0.setKeepAlive(true);
+                sock0.setTcpNoDelay(true);
+
+                while (!isInterrupted()) {
+                    try {
+                        TcpDiscoveryAbstractMessage msg = marsh.unmarshal(in, U.gridClassLoader());
+
+                        msg.senderNodeId(nodeId);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Message has been received: " + msg);
+
+                        stats.onMessageReceived(msg);
+
+                        IgniteSpiException err = null;
+
+                        if (joinLatch.getCount() > 0) {
+                            if (msg instanceof TcpDiscoveryDuplicateIdMessage)
+                                err = duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
+                            else if (msg instanceof TcpDiscoveryAuthFailedMessage)
+                                err = authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
+                            else if (msg instanceof TcpDiscoveryCheckFailedMessage)
+                                err = checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
+
+                            if (err != null) {
+                                joinErr = err;
+
+                                joinLatch.countDown();
+
+                                return;
+                            }
+                        }
+
+                        msgWrk.addMessage(msg);
+                    }
+                    catch (GridException e) {
+                        if (log.isDebugEnabled())
+                            U.error(log, "Failed to read message [sock=" + sock0 + ", locNodeId=" + locNodeId +
+                                ", rmtNodeId=" + nodeId + ']', e);
+
+                        IOException ioEx = X.cause(e, IOException.class);
+
+                        if (ioEx != null)
+                            throw ioEx;
+
+                        ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);
+
+                        if (clsNotFoundEx != null)
+                            LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
+                                "(make sure same versions of all classes are available on all nodes) " +
+                                "[rmtNodeId=" + nodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
+                        else
+                            LT.error(log, e, "Failed to read message [sock=" + sock0 + ", locNodeId=" + locNodeId +
+                                ", rmtNodeId=" + nodeId + ']');
+                    }
+                }
+            }
+            catch (IOException e) {
+                if (log.isDebugEnabled())
+                    U.error(log, "Connection failed [sock=" + sock0 + ", locNodeId=" + locNodeId +
+                        ", rmtNodeId=" + nodeId + ']', e);
+            }
+            finally {
+                U.closeQuiet(sock0);
+
+                U.interrupt(msgWrk);
+
+                try {
+                    U.join(msgWrk);
+                }
+                catch (GridInterruptedException ignored) {
+                    // No-op.
+                }
+
+                sock = null;
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        void addMessage(TcpDiscoveryAbstractMessage msg) {
+            assert msg != null;
+
+            msgWrk.addMessage(msg);
+        }
+    }
+
+    /**
+     * Message worker.
+     */
+    private class MessageWorker extends MessageWorkerAdapter {
+        /** Topology history. */
+        private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+
+        /** Indicates that reconnection is in progress. */
+        private boolean recon;
+
+        /** Indicates that pending messages are currently processed. */
+        private boolean pending;
+
+        /**
+         * @param recon Whether reconnection is in progress.
+         */
+        protected MessageWorker(boolean recon) {
+            super("tcp-client-disco-msg-worker");
+
+            this.recon = recon;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+            assert msg != null;
+            assert msg.verified() || msg.senderNodeId() == null;
+
+            stats.onMessageProcessingStarted(msg);
+
+            if (msg instanceof TcpDiscoveryClientReconnectMessage)
+                processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+            else {
+                if (recon && !pending) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding message received during reconnection: " + msg);
+                }
+                else {
+                    if (msg instanceof TcpDiscoveryNodeAddedMessage)
+                        processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+                    else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+                        processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
+                    else if (msg instanceof TcpDiscoveryNodeLeftMessage)
+                        processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
+                    else if (msg instanceof TcpDiscoveryNodeFailedMessage)
+                        processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
+                    else if (msg instanceof TcpDiscoveryHeartbeatMessage)
+                        processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+
+                    if (ensured(msg))
+                        lastMsgId = msg.id();
+                }
+            }
+
+            stats.onMessageProcessingFinished(msg);
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
+            if (leaveLatch != null)
+                return;
+
+            TcpDiscoveryNode node = msg.node();
+
+            UUID newNodeId = node.id();
+
+            if (locNodeId.equals(newNodeId)) {
+                if (joinLatch.getCount() > 0) {
+                    Collection<TcpDiscoveryNode> top = msg.topology();
+
+                    if (top != null) {
+                        for (TcpDiscoveryNode n : top) {
+                            if (n.order() > 0)
+                                n.visible(true);
+
+                            rmtNodes.put(n.id(), n);
+                        }
+
+                        topHist.clear();
+
+                        if (msg.topologyHistory() != null)
+                            topHist.putAll(msg.topologyHistory());
+
+                        Collection<List<Object>> dataList = msg.oldNodesDiscoveryData();
+
+                        if (dataList != null) {
+                            for (List<Object> discoData : dataList)
+                                exchange.onExchange(discoData);
+                        }
+
+                        locNode.setAttributes(node.attributes());
+                        locNode.visible(true);
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Discarding node added message with empty topology: " + msg);
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Discarding node added message (this message has already been processed) " +
+                        "[msg=" + msg + ", locNode=" + locNode + ']');
+            }
+            else {
+                boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
+
+                if (topChanged) {
+                    if (log.isDebugEnabled())
+                        log.debug("Added new node to topology: " + node);
+
+                    List<Object> data = msg.newNodeDiscoveryData();
+
+                    if (data != null)
+                        exchange.onExchange(data);
+                }
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+            if (leaveLatch != null)
+                return;
+
+            if (locNodeId.equals(msg.nodeId())) {
+                if (joinLatch.getCount() > 0) {
+                    long topVer = msg.topologyVersion();
+
+                    locNode.order(topVer);
+
+                    notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer));
+
+                    joinErr = null;
+
+                    joinLatch.countDown();
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Discarding node add finished message (this message has already been processed) " +
+                        "[msg=" + msg + ", locNode=" + locNode + ']');
+            }
+            else {
+                TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
+
+                if (node == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');
+
+                    return;
+                }
+
+                long topVer = msg.topologyVersion();
+
+                node.order(topVer);
+                node.visible(true);
+
+                if (locNodeVer.equals(node.version()))
+                    node.version(locNodeVer);
+
+                Collection<ClusterNode> top = updateTopologyHistory(topVer);
+
+                if (!pending && joinLatch.getCount() > 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node add finished message (join process is not finished): " + msg);
+
+                    return;
+                }
+
+                notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
+
+                stats.onNodeJoined();
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
+            if (locNodeId.equals(msg.creatorNodeId())) {
+                if (log.isDebugEnabled())
+                    log.debug("Received node left message for local node: " + msg);
+
+                CountDownLatch leaveLatch0 = leaveLatch;
+
+                assert leaveLatch0 != null;
+
+                leaveLatch0.countDown();
+            }
+            else {
+                if (leaveLatch != null)
+                    return;
+
+                TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
+
+                if (node == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node left message since node is not found [msg=" + msg + ']');
+
+                    return;
+                }
+
+                Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+
+                if (!pending && joinLatch.getCount() > 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node left message (join process is not finished): " + msg);
+
+                    return;
+                }
+
+                notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
+
+                stats.onNodeLeft();
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
+            if (leaveLatch != null)
+                return;
+
+            if (!locNodeId.equals(msg.creatorNodeId())) {
+                TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
+
+                if (node == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');
+
+                    return;
+                }
+
+                Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+
+                if (!pending && joinLatch.getCount() > 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding node failed message (join process is not finished): " + msg);
+
+                    return;
+                }
+
+                notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
+
+                stats.onNodeFailed();
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
+            if (leaveLatch != null)
+                return;
+
+            if (locNodeId.equals(msg.creatorNodeId())) {
+                if (msg.senderNodeId() == null) {
+                    Socket sock0 = sock;
+
+                    if (sock0 != null) {
+                        msg.setMetrics(locNodeId, metricsProvider.getMetrics());
+
+                        try {
+                            writeToSocket(sock0, msg);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Heartbeat message sent [sock=" + sock0 + ", msg=" + msg + ']');
+                        }
+                        catch (IOException | GridException e) {
+                            if (log.isDebugEnabled())
+                                U.error(log, "Failed to send heartbeat message [sock=" + sock0 +
+                                    ", msg=" + msg + ']', e);
+
+                            U.closeQuiet(sock0);
+
+                            sock = null;
+
+                            interrupt();
+                        }
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Failed to send heartbeat message (node is disconnected): " + msg);
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Received heartbeat response: " + msg);
+            }
+            else {
+                if (msg.hasMetrics()) {
+                    long tstamp = U.currentTimeMillis();
+
+                    for (Map.Entry<UUID, MetricsSet> e : msg.metrics().entrySet()) {
+                        MetricsSet metricsSet = e.getValue();
+
+                        updateMetrics(e.getKey(), metricsSet.metrics(), tstamp);
+
+                        for (T2<UUID, ClusterNodeMetrics> t : metricsSet.clientMetrics())
+                            updateMetrics(t.get1(), t.get2(), tstamp);
+                    }
+                }
+            }
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+            if (leaveLatch != null)
+                return;
+
+            if (locNodeId.equals(msg.creatorNodeId())) {
+                if (msg.success()) {
+                    pending = true;
+
+                    try {
+                        for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
+                            processMessage(pendingMsg);
+                    }
+                    finally {
+                        pending = false;
+                    }
+
+                    joinErr = null;
+                    reconFailed = false;
+
+                    joinLatch.countDown();
+                }
+                else {
+                    joinErr = null;
+                    reconFailed = true;
+
+                    getSpiContext().recordEvent(new IgniteDiscoveryEvent(locNode,
+                        "Client node disconnected: " + locNode,
+                        EVT_CLIENT_NODE_DISCONNECTED, locNode));
+
+                    joinLatch.countDown();
+                }
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Discarding reconnect message for another client: " + msg);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param metrics Metrics.
+         * @param tstamp Timestamp.
+         */
+        private void updateMetrics(UUID nodeId, ClusterNodeMetrics metrics, long tstamp) {
+            assert nodeId != null;
+            assert metrics != null;
+
+            TcpDiscoveryNode node = nodeId.equals(locNodeId) ? locNode : rmtNodes.get(nodeId);
+
+            if (node != null && node.visible()) {
+                node.setMetrics(metrics);
+
+                node.lastUpdateTime(tstamp);
+
+                notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes());
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Received metrics from unknown node: " + nodeId);
+        }
+
+        /**
+         * @param topVer New topology version.
+         * @return Latest topology snapshot.
+         */
+        private Collection<ClusterNode> updateTopologyHistory(long topVer) {
+            TcpClientDiscoverySpi.this.topVer = topVer;
+
+            Collection<ClusterNode> allNodes = allNodes();
+
+            if (!topHist.containsKey(topVer)) {
+                assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
+                    "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
+
+                topHist.put(topVer, allNodes);
+
+                if (topHist.size() > topHistSize)
+                    topHist.pollFirstEntry();
+
+                assert topHist.lastKey() == topVer;
+                assert topHist.size() <= topHistSize;
+            }
+
+            return allNodes;
+        }
+
+        /**
+         * @return All nodes.
+         */
+        private Collection<ClusterNode> allNodes() {
+            Collection<ClusterNode> allNodes = new TreeSet<>();
+
+            for (TcpDiscoveryNode node : rmtNodes.values()) {
+                if (node.visible())
+                    allNodes.add(node);
+            }
+
+            allNodes.add(locNode);
+
+            return allNodes;
+        }
+
+        /**
+         * @param type Event type.
+         * @param topVer Topology version.
+         * @param node Node.
+         * @param top Topology snapshot.
+         */
+        private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) {
+            DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr;
+
+            if (lsnr != null) {
+                if (log.isDebugEnabled())
+                    log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
+                        ", topVer=" + topVer + ']');
+
+                lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist));
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
+                    ", topVer=" + topVer + ']');
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
new file mode 100644
index 0000000..95c7fb9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
@@ -0,0 +1,156 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.mbean.*;
+import org.apache.ignite.spi.*;
+
+import java.util.*;
+
+/**
+ * Management bean for {@link TcpClientDiscoverySpi}.
+ */
+public interface TcpClientDiscoverySpiMBean extends IgniteSpiManagementMBean {
+    /**
+     * Gets disconnect check interval.
+     *
+     * @return Disconnect check interval.
+     */
+    @IgniteMBeanDescription("Disconnect check interval.")
+    public long getDisconnectCheckInterval();
+
+    /**
+     * Gets socket timeout.
+     *
+     * @return Socket timeout.
+     */
+    @IgniteMBeanDescription("Socket timeout.")
+    public long getSocketTimeout();
+
+    /**
+     * Gets message acknowledgement timeout.
+     *
+     * @return Message acknowledgement timeout.
+     */
+    @IgniteMBeanDescription("Message acknowledgement timeout.")
+    public long getAckTimeout();
+
+    /**
+     * Gets network timeout.
+     *
+     * @return Network timeout.
+     */
+    @IgniteMBeanDescription("Network timeout.")
+    public long getNetworkTimeout();
+
+    /**
+     * Gets thread priority. All threads within SPI will be started with it.
+     *
+     * @return Thread priority.
+     */
+    @IgniteMBeanDescription("Threads priority.")
+    public int getThreadPriority();
+
+    /**
+     * Gets delay between heartbeat messages sent by coordinator.
+     *
+     * @return Time period in milliseconds.
+     */
+    @IgniteMBeanDescription("Heartbeat frequency.")
+    public long getHeartbeatFrequency();
+
+    /**
+     * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation).
+     *
+     * @return IPFinder (string representation).
+     */
+    @IgniteMBeanDescription("IP Finder.")
+    public String getIpFinderFormatted();
+
+    /**
+     * Gets message worker queue current size.
+     *
+     * @return Message worker queue current size.
+     */
+    @IgniteMBeanDescription("Message worker queue current size.")
+    public int getMessageWorkerQueueSize();
+
+    /**
+     * Gets joined nodes count.
+     *
+     * @return Nodes joined count.
+     */
+    @IgniteMBeanDescription("Nodes joined count.")
+    public long getNodesJoined();
+
+    /**
+     * Gets left nodes count.
+     *
+     * @return Left nodes count.
+     */
+    @IgniteMBeanDescription("Nodes left count.")
+    public long getNodesLeft();
+
+    /**
+     * Gets failed nodes count.
+     *
+     * @return Failed nodes count.
+     */
+    @IgniteMBeanDescription("Nodes failed count.")
+    public long getNodesFailed();
+
+    /**
+     * Gets avg message processing time.
+     *
+     * @return Avg message processing time.
+     */
+    @IgniteMBeanDescription("Avg message processing time.")
+    public long getAvgMessageProcessingTime();
+
+    /**
+     * Gets max message processing time.
+     *
+     * @return Max message processing time.
+     */
+    @IgniteMBeanDescription("Max message processing time.")
+    public long getMaxMessageProcessingTime();
+
+    /**
+     * Gets total received messages count.
+     *
+     * @return Total received messages count.
+     */
+    @IgniteMBeanDescription("Total received messages count.")
+    public int getTotalReceivedMessages();
+
+    /**
+     * Gets received messages counts (grouped by type).
+     *
+     * @return Map containing message types and respective counts.
+     */
+    @IgniteMBeanDescription("Received messages by type.")
+    public Map<String, Integer> getReceivedMessages();
+
+    /**
+     * Gets total processed messages count.
+     *
+     * @return Total processed messages count.
+     */
+    @IgniteMBeanDescription("Total processed messages count.")
+    public int getTotalProcessedMessages();
+
+    /**
+     * Gets processed messages counts (grouped by type).
+     *
+     * @return Map containing message types and respective counts.
+     */
+    @IgniteMBeanDescription("Received messages by type.")
+    public Map<String, Integer> getProcessedMessages();
+}