You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 15:07:37 UTC

[36/52] [abbrv] incubator-ignite git commit: # Renaming

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
new file mode 100644
index 0000000..5377e18
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -0,0 +1,996 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.product.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.io.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
+
+/**
+ * Base class for TCP discovery SPIs.
+ */
+abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi {
+    /** Default port to listen (value is <tt>47500</tt>). */
+    public static final int DFLT_PORT = 47500;
+
+    /** Default socket operations timeout in milliseconds (value is <tt>2,000ms</tt>). */
+    public static final long DFLT_SOCK_TIMEOUT = 2000;
+
+    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */
+    public static final long DFLT_ACK_TIMEOUT = 5000;
+
+    /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */
+    public static final long DFLT_NETWORK_TIMEOUT = 5000;
+
+    /** Default value for thread priority (value is <tt>10</tt>). */
+    public static final int DFLT_THREAD_PRI = 10;
+
+    /** Default heartbeat messages issuing frequency (value is <tt>2,000ms</tt>). */
+    public static final long DFLT_HEARTBEAT_FREQ = 2000;
+
+    /** Default size of topology snapshots history. */
+    public static final int DFLT_TOP_HISTORY_SIZE = 1000;
+
+    /** Response OK. */
+    protected static final int RES_OK = 1;
+
+    /** Response CONTINUE JOIN. */
+    protected static final int RES_CONTINUE_JOIN = 100;
+
+    /** Response WAIT. */
+    protected static final int RES_WAIT = 200;
+
+    /** Local address. */
+    protected String locAddr;
+
+    /** IP finder. */
+    protected TcpDiscoveryIpFinder ipFinder;
+
+    /** Socket operations timeout. */
+    protected long sockTimeout = DFLT_SOCK_TIMEOUT;
+
+    /** Message acknowledgement timeout. */
+    protected long ackTimeout = DFLT_ACK_TIMEOUT;
+
+    /** Network timeout. */
+    protected long netTimeout = DFLT_NETWORK_TIMEOUT;
+
+    /** Thread priority for all threads started by SPI. */
+    protected int threadPri = DFLT_THREAD_PRI;
+
+    /** Heartbeat messages issuing frequency. */
+    protected long hbFreq = DFLT_HEARTBEAT_FREQ;
+
+    /** Size of topology snapshots history. */
+    protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
+
+    /** Grid discovery listener. */
+    protected volatile DiscoverySpiListener lsnr;
+
+    /** Data exchange. */
+    protected DiscoverySpiDataExchange exchange;
+
+    /** Metrics provider. */
+    protected DiscoveryMetricsProvider metricsProvider;
+
+    /** Local node attributes. */
+    protected Map<String, Object> locNodeAttrs;
+
+    /** Local node version. */
+    protected IgniteProductVersion locNodeVer;
+
+    /** Local node. */
+    protected TcpDiscoveryNode locNode;
+
+    /** Local host. */
+    protected InetAddress locHost;
+
+    /** Internal and external addresses of local node. */
+    protected Collection<InetSocketAddress> locNodeAddrs;
+
+    /** Socket timeout worker. */
+    protected SocketTimeoutWorker sockTimeoutWorker;
+
+    /** Discovery state. */
+    protected TcpDiscoverySpiState spiState = DISCONNECTED;
+
+    /** Start time of the very first grid node. */
+    protected volatile long gridStartTime;
+
+    /** Marshaller. */
+    protected final IgniteMarshaller marsh = new IgniteJdkMarshaller();
+
+    /** Statistics. */
+    protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics();
+
+    /** Local node ID. */
+    @IgniteLocalNodeIdResource
+    protected UUID locNodeId;
+
+    /** Name of the grid. */
+    @IgniteNameResource
+    protected String gridName;
+
+    /** Logger. */
+    @IgniteLoggerResource
+    protected IgniteLogger log;
+
+    /**
+     * Sets local host IP address that discovery SPI uses.
+     * <p>
+     * If not provided, by default a first found non-loopback address
+     * will be used. If there is no non-loopback address available,
+     * then {@link InetAddress#getLocalHost()} will be used.
+     *
+     * @param locAddr IP address.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    @IgniteLocalHostResource
+    public void setLocalAddress(String locAddr) {
+        // Injection should not override value already set by Spring or user.
+        if (this.locAddr == null)
+            this.locAddr = locAddr;
+    }
+
+    /**
+     * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method.
+     *
+     * @return local address.
+     */
+    public String getLocalAddress() {
+        return locAddr;
+    }
+
+    /**
+     * Gets IP finder for IP addresses sharing and storing.
+     *
+     * @return IP finder for IP addresses sharing and storing.
+     */
+    public TcpDiscoveryIpFinder getIpFinder() {
+        return ipFinder;
+    }
+
+    /**
+     * Sets IP finder for IP addresses sharing and storing.
+     * <p>
+     * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
+     *
+     * @param ipFinder IP finder.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setIpFinder(TcpDiscoveryIpFinder ipFinder) {
+        this.ipFinder = ipFinder;
+    }
+
+    /**
+     * Sets socket operations timeout. This timeout is used to limit connection time and
+     * write-to-socket time.
+     * <p>
+     * Note that when running GridGain on Amazon EC2, socket timeout must be set to a value
+     * significantly greater than the default (e.g. to {@code 30000}).
+     * <p>
+     * If not specified, default is {@link #DFLT_SOCK_TIMEOUT}.
+     *
+     * @param sockTimeout Socket connection timeout.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setSocketTimeout(long sockTimeout) {
+        this.sockTimeout = sockTimeout;
+    }
+
+    /**
+     * Sets timeout for receiving acknowledgement for sent message.
+     * <p>
+     * If acknowledgement is not received within this timeout, sending is considered as failed
+     * and SPI tries to repeat message sending.
+     * <p>
+     * If not specified, default is {@link #DFLT_ACK_TIMEOUT}.
+     *
+     * @param ackTimeout Acknowledgement timeout.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setAckTimeout(long ackTimeout) {
+        this.ackTimeout = ackTimeout;
+    }
+
+    /**
+     * Sets maximum network timeout to use for network operations.
+     * <p>
+     * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}.
+     *
+     * @param netTimeout Network timeout.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setNetworkTimeout(long netTimeout) {
+        this.netTimeout = netTimeout;
+    }
+
+    /**
+     * Sets thread priority. All threads within SPI will be started with it.
+     * <p>
+     * If not provided, default value is {@link #DFLT_THREAD_PRI}
+     *
+     * @param threadPri Thread priority.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setThreadPriority(int threadPri) {
+        this.threadPri = threadPri;
+    }
+
+    /**
+     * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
+     * in configurable time interval to other nodes to notify them about its state.
+     * <p>
+     * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
+     *
+     * @param hbFreq Heartbeat frequency in milliseconds.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setHeartbeatFrequency(long hbFreq) {
+        this.hbFreq = hbFreq;
+    }
+
+    /**
+     * @return Size of topology snapshots history.
+     */
+    public long getTopHistorySize() {
+        return topHistSize;
+    }
+
+    /**
+     * Sets size of topology snapshots history. Specified size should be greater than or equal to default size
+     * {@link #DFLT_TOP_HISTORY_SIZE}.
+     *
+     * @param topHistSize Size of topology snapshots history.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setTopHistorySize(int topHistSize) {
+        if (topHistSize < DFLT_TOP_HISTORY_SIZE) {
+            U.warn(log, "Topology history size should be greater than or equal to default size. " +
+                "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize +
+                ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']');
+
+            return;
+        }
+
+        this.topHistSize = topHistSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
+        assert locNodeAttrs == null;
+        assert locNodeVer == null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("Node attributes to set: " + attrs);
+            log.debug("Node version to set: " + ver);
+        }
+
+        locNodeAttrs = attrs;
+        locNodeVer = ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+        super.onContextInitialized0(spiCtx);
+
+        ipFinder.onSpiContextInitialized(spiCtx);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onContextDestroyed0() {
+        super.onContextDestroyed0();
+
+        ipFinder.onSpiContextDestroyed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode getLocalNode() {
+        return locNode;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+        this.lsnr = lsnr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
+        this.exchange = exchange;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
+        this.metricsProvider = metricsProvider;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getGridStartTime() {
+        assert gridStartTime != 0;
+
+        return gridStartTime;
+    }
+
+    /**
+     * @param sockAddr Remote address.
+     * @return Opened socket.
+     * @throws IOException If failed.
+     */
+    protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+        assert sockAddr != null;
+
+        InetSocketAddress resolved = sockAddr.isUnresolved() ?
+            new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr;
+
+        InetAddress addr = resolved.getAddress();
+
+        assert addr != null;
+
+        Socket sock = new Socket();
+
+        sock.bind(new InetSocketAddress(locHost, 0));
+
+        sock.setTcpNoDelay(true);
+
+        sock.connect(resolved, (int)sockTimeout);
+
+        writeToSocket(sock, U.GG_HEADER);
+
+        return sock;
+    }
+
+    /**
+     * Writes message to the socket.
+     *
+     * @param sock Socket.
+     * @param data Raw data to write.
+     * @throws IOException If IO failed or write timed out.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    protected void writeToSocket(Socket sock, byte[] data) throws IOException {
+        assert sock != null;
+        assert data != null;
+
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+
+        sockTimeoutWorker.addTimeoutObject(obj);
+
+        IOException err = null;
+
+        try {
+            OutputStream out = sock.getOutputStream();
+
+            out.write(data);
+
+            out.flush();
+        }
+        catch (IOException e) {
+            err = e;
+        }
+        finally {
+            boolean cancelled = obj.cancel();
+
+            if (cancelled)
+                sockTimeoutWorker.removeTimeoutObject(obj);
+
+            // Throw original exception.
+            if (err != null)
+                throw err;
+
+            if (!cancelled)
+                throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
+        }
+    }
+
+    /**
+     * Writes message to the socket.
+     *
+     * @param sock Socket.
+     * @param msg Message.
+     * @throws IOException If IO failed or write timed out.
+     * @throws GridException If marshalling failed.
+     */
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, GridException {
+        writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K.
+    }
+
+    /**
+     * Writes message to the socket.
+     *
+     * @param sock Socket.
+     * @param msg Message.
+     * @param bout Byte array output stream.
+     * @throws IOException If IO failed or write timed out.
+     * @throws GridException If marshalling failed.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout)
+        throws IOException, GridException {
+        assert sock != null;
+        assert msg != null;
+        assert bout != null;
+
+        // Marshall message first to perform only write after.
+        marsh.marshal(msg, bout);
+
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+
+        sockTimeoutWorker.addTimeoutObject(obj);
+
+        IOException err = null;
+
+        try {
+            OutputStream out = sock.getOutputStream();
+
+            bout.writeTo(out);
+
+            out.flush();
+        }
+        catch (IOException e) {
+            err = e;
+        }
+        finally {
+            boolean cancelled = obj.cancel();
+
+            if (cancelled)
+                sockTimeoutWorker.removeTimeoutObject(obj);
+
+            // Throw original exception.
+            if (err != null)
+                throw err;
+
+            if (!cancelled)
+                throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
+        }
+    }
+
+    /**
+     * Writes response to the socket.
+     *
+     * @param sock Socket.
+     * @param res Integer response.
+     * @throws IOException If IO failed or write timed out.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    protected void writeToSocket(Socket sock, int res) throws IOException {
+        assert sock != null;
+
+        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+
+        sockTimeoutWorker.addTimeoutObject(obj);
+
+        OutputStream out = sock.getOutputStream();
+
+        IOException err = null;
+
+        try {
+            out.write(res);
+
+            out.flush();
+        }
+        catch (IOException e) {
+            err = e;
+        }
+        finally {
+            boolean cancelled = obj.cancel();
+
+            if (cancelled)
+                sockTimeoutWorker.removeTimeoutObject(obj);
+
+            // Throw original exception.
+            if (err != null)
+                throw err;
+
+            if (!cancelled)
+                throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
+        }
+    }
+
+    /**
+     * Reads message from the socket limiting read time.
+     *
+     * @param sock Socket.
+     * @param in Input stream (in case socket stream was wrapped).
+     * @param timeout Socket timeout for this operation.
+     * @return Message.
+     * @throws IOException If IO failed or read timed out.
+     * @throws GridException If unmarshalling failed.
+     */
+    protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, GridException {
+        assert sock != null;
+
+        int oldTimeout = sock.getSoTimeout();
+
+        try {
+            sock.setSoTimeout((int)timeout);
+
+            return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
+        }
+        catch (IOException | GridException e) {
+            if (X.hasCause(e, SocketTimeoutException.class))
+                LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " +
+                    "in long GC pauses on remote node. Current timeout: " + timeout + '.');
+
+            throw e;
+        }
+        finally {
+            // Quietly restore timeout.
+            try {
+                sock.setSoTimeout(oldTimeout);
+            }
+            catch (SocketException ignored) {
+                // No-op.
+            }
+        }
+    }
+
+    /**
+     * Reads message delivery receipt from the socket.
+     *
+     * @param sock Socket.
+     * @param timeout Socket timeout for this operation.
+     * @return Receipt.
+     * @throws IOException If IO failed or read timed out.
+     */
+    protected int readReceipt(Socket sock, long timeout) throws IOException {
+        assert sock != null;
+
+        int oldTimeout = sock.getSoTimeout();
+
+        try {
+            sock.setSoTimeout((int)timeout);
+
+            int res = sock.getInputStream().read();
+
+            if (res == -1)
+                throw new EOFException();
+
+            return res;
+        }
+        catch (SocketTimeoutException e) {
+            LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " +
+                "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " +
+                "configuration property). Will retry to send message with increased timeout. " +
+                "Current timeout: " + timeout + '.');
+
+            stats.onAckTimeout();
+
+            throw e;
+        }
+        finally {
+            // Quietly restore timeout.
+            try {
+                sock.setSoTimeout(oldTimeout);
+            }
+            catch (SocketException ignored) {
+                // No-op.
+            }
+        }
+    }
+
+    /**
+     * Resolves addresses registered in the IP finder, removes duplicates and local host
+     * address and returns the collection of.
+     *
+     * @return Resolved addresses without duplicates and local address (potentially
+     *      empty but never null).
+     * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
+     */
+    protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
+        List<InetSocketAddress> res = new ArrayList<>();
+
+        Collection<InetSocketAddress> addrs;
+
+        // Get consistent addresses collection.
+        while (true) {
+            try {
+                addrs = registeredAddresses();
+
+                break;
+            }
+            catch (IgniteSpiException e) {
+                LT.error(log, e, "Failed to get registered addresses from IP finder on start " +
+                    "(retrying every 2000 ms).");
+            }
+
+            try {
+                U.sleep(2000);
+            }
+            catch (GridInterruptedException e) {
+                throw new IgniteSpiException("Thread has been interrupted.", e);
+            }
+        }
+
+        for (InetSocketAddress addr : addrs) {
+            assert addr != null;
+
+            try {
+                InetSocketAddress resolved = addr.isUnresolved() ?
+                    new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr;
+
+                if (locNodeAddrs == null || !locNodeAddrs.contains(resolved))
+                    res.add(resolved);
+            }
+            catch (UnknownHostException ignored) {
+                LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr);
+
+                // Add address in any case.
+                res.add(addr);
+            }
+        }
+
+        if (!res.isEmpty())
+            Collections.shuffle(res);
+
+        return res;
+    }
+
+    /**
+     * Gets addresses registered in the IP finder, initializes addresses having no
+     * port (or 0 port) with {@link #DFLT_PORT}.
+     *
+     * @return Registered addresses.
+     * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
+     */
+    protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException {
+        Collection<InetSocketAddress> res = new LinkedList<>();
+
+        for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) {
+            if (addr.getPort() == 0)
+                addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), DFLT_PORT) :
+                    new InetSocketAddress(addr.getAddress(), DFLT_PORT);
+
+            res.add(addr);
+        }
+
+        return res;
+    }
+
+    /**
+     * @param msg Message.
+     * @return Error.
+     */
+    protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) {
+        assert msg != null;
+
+        return new IgniteSpiException("Local node has the same ID as existing node in topology " +
+            "(fix configuration and restart local node) [localNode=" + locNode +
+            ", existingNode=" + msg.node() + ']');
+    }
+
+    /**
+     * @param msg Message.
+     * @return Error.
+     */
+    protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) {
+        assert msg != null;
+
+        return new IgniteSpiException(new GridAuthenticationException("Authentication failed [nodeId=" +
+            msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']'));
+    }
+
+    /**
+     * @param msg Message.
+     * @return Error.
+     */
+    protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) {
+        assert msg != null;
+
+        return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) :
+            new IgniteSpiException(msg.error());
+    }
+
+    /**
+     * @param msg Message.
+     * @return Whether delivery of the message is ensured.
+     */
+    protected boolean ensured(TcpDiscoveryAbstractMessage msg) {
+        return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null;
+    }
+
+    /**
+     * @param msg Failed message.
+     * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise.
+     * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it
+     *      and create separate message for failed version check with next major release.
+     */
+    @Deprecated
+    private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) {
+        return msg.error().contains("versions are not compatible");
+    }
+
+    /**
+     * Handles sockets timeouts.
+     */
+    protected class SocketTimeoutWorker extends IgniteSpiThread {
+        /** Time-based sorted set for timeout objects. */
+        private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs =
+            new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() {
+                @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) {
+                    long time1 = o1.endTime();
+                    long time2 = o2.endTime();
+
+                    long id1 = o1.id();
+                    long id2 = o2.id();
+
+                    return time1 < time2 ? -1 : time1 > time2 ? 1 :
+                        id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
+                }
+            });
+
+        /** Mutex. */
+        private final Object mux0 = new Object();
+
+        /**
+         *
+         */
+        SocketTimeoutWorker() {
+            super(gridName, "tcp-disco-sock-timeout-worker", log);
+
+            setPriority(threadPri);
+        }
+
+        /**
+         * @param timeoutObj Timeout object to add.
+         */
+        @SuppressWarnings({"NakedNotify"})
+        public void addTimeoutObject(SocketTimeoutObject timeoutObj) {
+            assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
+
+            timeoutObjs.add(timeoutObj);
+
+            if (timeoutObjs.firstx() == timeoutObj) {
+                synchronized (mux0) {
+                    mux0.notifyAll();
+                }
+            }
+        }
+
+        /**
+         * @param timeoutObj Timeout object to remove.
+         */
+        public void removeTimeoutObject(SocketTimeoutObject timeoutObj) {
+            assert timeoutObj != null;
+
+            timeoutObjs.remove(timeoutObj);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("Socket timeout worker has been started.");
+
+            while (!isInterrupted()) {
+                long now = U.currentTimeMillis();
+
+                for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
+                    SocketTimeoutObject timeoutObj = iter.next();
+
+                    if (timeoutObj.endTime() <= now) {
+                        iter.remove();
+
+                        if (timeoutObj.onTimeout()) {
+                            LT.warn(log, null, "Socket write has timed out (consider increasing " +
+                                "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
+
+                            stats.onSocketTimeout();
+                        }
+                    }
+                    else
+                        break;
+                }
+
+                synchronized (mux0) {
+                    while (true) {
+                        // Access of the first element must be inside of
+                        // synchronization block, so we don't miss out
+                        // on thread notification events sent from
+                        // 'addTimeoutObject(..)' method.
+                        SocketTimeoutObject first = timeoutObjs.firstx();
+
+                        if (first != null) {
+                            long waitTime = first.endTime() - U.currentTimeMillis();
+
+                            if (waitTime > 0)
+                                mux0.wait(waitTime);
+                            else
+                                break;
+                        }
+                        else
+                            mux0.wait(5000);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Socket timeout object.
+     */
+    protected static class SocketTimeoutObject {
+        /** */
+        private static final AtomicLong idGen = new AtomicLong();
+
+        /** */
+        private final long id = idGen.incrementAndGet();
+
+        /** */
+        private final Socket sock;
+
+        /** */
+        private final long endTime;
+
+        /** */
+        private final AtomicBoolean done = new AtomicBoolean();
+
+        /**
+         * @param sock Socket.
+         * @param endTime End time.
+         */
+        SocketTimeoutObject(Socket sock, long endTime) {
+            assert sock != null;
+            assert endTime > 0;
+
+            this.sock = sock;
+            this.endTime = endTime;
+        }
+
+        /**
+         * @return {@code True} if object has not yet been processed.
+         */
+        boolean cancel() {
+            return done.compareAndSet(false, true);
+        }
+
+        /**
+         * @return {@code True} if object has not yet been canceled.
+         */
+        boolean onTimeout() {
+            if (done.compareAndSet(false, true)) {
+                // Close socket - timeout occurred.
+                U.closeQuiet(sock);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @return End time.
+         */
+        long endTime() {
+            return endTime;
+        }
+
+        /**
+         * @return ID.
+         */
+        long id() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SocketTimeoutObject.class, this);
+        }
+    }
+
+    /**
+     * Base class for message workers.
+     */
+    protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
+        /** Pre-allocated output stream (100K). */
+        private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024);
+
+        /** Message queue. */
+        private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();
+
+        /** Backed interrupted flag. */
+        private volatile boolean interrupted;
+
+        /**
+         * @param name Thread name.
+         */
+        protected MessageWorkerAdapter(String name) {
+            super(gridName, name, log);
+
+            setPriority(threadPri);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("Message worker started [locNodeId=" + locNodeId + ']');
+
+            while (!isInterrupted()) {
+                TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
+
+                if (msg == null)
+                    continue;
+
+                processMessage(msg);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void interrupt() {
+            interrupted = true;
+
+            super.interrupt();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isInterrupted() {
+            return interrupted || super.isInterrupted();
+        }
+
+        /**
+         * @return Current queue size.
+         */
+        int queueSize() {
+            return queue.size();
+        }
+
+        /**
+         * Adds message to queue.
+         *
+         * @param msg Message to add.
+         */
+        void addMessage(TcpDiscoveryAbstractMessage msg) {
+            assert msg != null;
+
+            if (msg instanceof TcpDiscoveryHeartbeatMessage)
+                queue.addFirst(msg);
+            else
+                queue.add(msg);
+
+            if (log.isDebugEnabled())
+                log.debug("Message has been added to queue: " + msg);
+        }
+
+        protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
+
+        /**
+         * @param sock Socket.
+         * @param msg Message.
+         * @throws IOException If IO failed.
+         * @throws GridException If marshalling failed.
+         */
+        protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+            throws IOException, GridException {
+            bout.reset();
+
+            TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
new file mode 100644
index 0000000..5043d78
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
@@ -0,0 +1,267 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.mbean.*;
+import org.apache.ignite.spi.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Management bean for {@link TcpDiscoverySpi}.
+ */
+public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean {
+    /**
+     * Gets delay between heartbeat messages sent by coordinator.
+     *
+     * @return Time period in milliseconds.
+     */
+    @IgniteMBeanDescription("Heartbeat frequency.")
+    public long getHeartbeatFrequency();
+
+    /**
+     * Gets current SPI state.
+     *
+     * @return Current SPI state.
+     */
+    @IgniteMBeanDescription("SPI state.")
+    public String getSpiState();
+
+    /**
+     * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation).
+     *
+     * @return IPFinder (string representation).
+     */
+    @IgniteMBeanDescription("IP Finder.")
+    public String getIpFinderFormatted();
+
+    /**
+     * Gets number of connection attempts.
+     *
+     * @return Number of connection attempts.
+     */
+    @IgniteMBeanDescription("Reconnect count.")
+    public int getReconnectCount();
+
+    /**
+     * Gets network timeout.
+     *
+     * @return Network timeout.
+     */
+    @IgniteMBeanDescription("Network timeout.")
+    public long getNetworkTimeout();
+
+    /**
+     * Gets local TCP port SPI listens to.
+     *
+     * @return Local port range.
+     */
+    @IgniteMBeanDescription("Local TCP port.")
+    public int getLocalPort();
+
+    /**
+     * Gets local TCP port range.
+     *
+     * @return Local port range.
+     */
+    @IgniteMBeanDescription("Local TCP port range.")
+    public int getLocalPortRange();
+
+    /**
+     * Gets max heartbeats count node can miss without initiating status check.
+     *
+     * @return Max missed heartbeats.
+     */
+    @IgniteMBeanDescription("Max missed heartbeats.")
+    public int getMaxMissedHeartbeats();
+
+    /**
+     * Gets max heartbeats count node can miss without failing client node.
+     *
+     * @return Max missed client heartbeats.
+     */
+    @IgniteMBeanDescription("Max missed client heartbeats.")
+    public int getMaxMissedClientHeartbeats();
+
+    /**
+     * Gets thread priority. All threads within SPI will be started with it.
+     *
+     * @return Thread priority.
+     */
+    @IgniteMBeanDescription("Threads priority.")
+    public int getThreadPriority();
+
+    /**
+     * Gets IP finder clean frequency.
+     *
+     * @return IP finder clean frequency.
+     */
+    @IgniteMBeanDescription("IP finder clean frequency.")
+    public long getIpFinderCleanFrequency();
+
+    /**
+     * Gets statistics print frequency.
+     *
+     * @return Statistics print frequency in milliseconds.
+     */
+    @IgniteMBeanDescription("Statistics print frequency.")
+    public long getStatisticsPrintFrequency();
+
+    /**
+     * Gets message worker queue current size.
+     *
+     * @return Message worker queue current size.
+     */
+    @IgniteMBeanDescription("Message worker queue current size.")
+    public int getMessageWorkerQueueSize();
+
+    /**
+     * Gets joined nodes count.
+     *
+     * @return Nodes joined count.
+     */
+    @IgniteMBeanDescription("Nodes joined count.")
+    public long getNodesJoined();
+
+    /**
+     * Gets left nodes count.
+     *
+     * @return Left nodes count.
+     */
+    @IgniteMBeanDescription("Nodes left count.")
+    public long getNodesLeft();
+
+    /**
+     * Gets failed nodes count.
+     *
+     * @return Failed nodes count.
+     */
+    @IgniteMBeanDescription("Nodes failed count.")
+    public long getNodesFailed();
+
+    /**
+     * Gets pending messages registered count.
+     *
+     * @return Pending messages registered count.
+     */
+    @IgniteMBeanDescription("Pending messages registered.")
+    public long getPendingMessagesRegistered();
+
+    /**
+     * Gets pending messages discarded count.
+     *
+     * @return Pending messages registered count.
+     */
+    @IgniteMBeanDescription("Pending messages discarded.")
+    public long getPendingMessagesDiscarded();
+
+    /**
+     * Gets avg message processing time.
+     *
+     * @return Avg message processing time.
+     */
+    @IgniteMBeanDescription("Avg message processing time.")
+    public long getAvgMessageProcessingTime();
+
+    /**
+     * Gets max message processing time.
+     *
+     * @return Max message processing time.
+     */
+    @IgniteMBeanDescription("Max message processing time.")
+    public long getMaxMessageProcessingTime();
+
+    /**
+     * Gets total received messages count.
+     *
+     * @return Total received messages count.
+     */
+    @IgniteMBeanDescription("Total received messages count.")
+    public int getTotalReceivedMessages();
+
+    /**
+     * Gets received messages counts (grouped by type).
+     *
+     * @return Map containing message types and respective counts.
+     */
+    @IgniteMBeanDescription("Received messages by type.")
+    public Map<String, Integer> getReceivedMessages();
+
+    /**
+     * Gets total processed messages count.
+     *
+     * @return Total processed messages count.
+     */
+    @IgniteMBeanDescription("Total processed messages count.")
+    public int getTotalProcessedMessages();
+
+    /**
+     * Gets processed messages counts (grouped by type).
+     *
+     * @return Map containing message types and respective counts.
+     */
+    @IgniteMBeanDescription("Received messages by type.")
+    public Map<String, Integer> getProcessedMessages();
+
+    /**
+     * Gets time local node has been coordinator since.
+     *
+     * @return Time local node is coordinator since.
+     */
+    @IgniteMBeanDescription("Local node is coordinator since.")
+    public long getCoordinatorSinceTimestamp();
+
+    /**
+     * Gets current coordinator.
+     *
+     * @return Gets current coordinator.
+     */
+    @IgniteMBeanDescription("Coordinator node ID.")
+    @Nullable public UUID getCoordinator();
+
+    /**
+     * Gets message acknowledgement timeout.
+     *
+     * @return Message acknowledgement timeout.
+     */
+    @IgniteMBeanDescription("Message acknowledgement timeout.")
+    public long getAckTimeout();
+
+    /**
+     * Gets maximum message acknowledgement timeout.
+     *
+     * @return Maximum message acknowledgement timeout.
+     */
+    @IgniteMBeanDescription("Maximum message acknowledgement timeout.")
+    public long getMaxAckTimeout();
+
+    /**
+     * Gets socket timeout.
+     *
+     * @return Socket timeout.
+     */
+    @IgniteMBeanDescription("Socket timeout.")
+    public long getSocketTimeout();
+
+    /**
+     * Gets join timeout.
+     *
+     * @return Join timeout.
+     */
+    @IgniteMBeanDescription("Join timeout.")
+    public long getJoinTimeout();
+
+    /**
+     * Dumps debug info using configured logger.
+     */
+    @IgniteMBeanDescription("Dump debug info.")
+    public void dumpDebugInfo();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
new file mode 100644
index 0000000..5658545
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -0,0 +1,443 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp.internal;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.product.*;
+import org.gridgain.grid.kernal.*;
+import org.apache.ignite.spi.discovery.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import static org.gridgain.grid.kernal.GridNodeAttributes.*;
+
+/**
+ * Node for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}.
+ * <p>
+ * <strong>This class is not intended for public use</strong> and has been made
+ * <tt>public</tt> due to certain limitations of Java technology.
+ */
+public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements ClusterNode,
+    Comparable<TcpDiscoveryNode>, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Node ID. */
+    private UUID id;
+
+    /** Consistent ID. */
+    private Object consistentId;
+
+    /** Node attributes. */
+    @GridToStringExclude
+    private Map<String, Object> attrs;
+
+    /** Internal discovery addresses as strings. */
+    @GridToStringInclude
+    private Collection<String> addrs;
+
+    /** Internal discovery host names as strings. */
+    private Collection<String> hostNames;
+
+    /** */
+    @GridToStringInclude
+    private Collection<InetSocketAddress> sockAddrs;
+
+    /** */
+    @GridToStringInclude
+    private int discPort;
+
+    /** Node metrics. */
+    @GridToStringExclude
+    private volatile ClusterNodeMetrics metrics;
+
+    /** Node order in the topology. */
+    private volatile long order;
+
+    /** Node order in the topology (internal). */
+    private volatile long intOrder;
+
+    /** The most recent time when heartbeat message was received from the node. */
+    @GridToStringExclude
+    private volatile long lastUpdateTime = U.currentTimeMillis();
+
+    /** Metrics provider (transient). */
+    @GridToStringExclude
+    private DiscoveryMetricsProvider metricsProvider;
+
+    /** Visible flag (transient). */
+    @GridToStringExclude
+    private boolean visible;
+
+    /** Grid local node flag (transient). */
+    private boolean loc;
+
+    /** Version. */
+    private IgniteProductVersion ver;
+
+    /** Alive check (used by clients). */
+    @GridToStringExclude
+    private transient int aliveCheck;
+
+    /** Client router node ID. */
+    @GridToStringExclude
+    private UUID clientRouterNodeId;
+
+    /**
+     * Public default no-arg constructor for {@link Externalizable} interface.
+     */
+    public TcpDiscoveryNode() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param id Node Id.
+     * @param addrs Addresses.
+     * @param hostNames Host names.
+     * @param discPort Port.
+     * @param metricsProvider Metrics provider.
+     * @param ver Version.
+     */
+    public TcpDiscoveryNode(UUID id, Collection<String> addrs, Collection<String> hostNames, int discPort,
+                            DiscoveryMetricsProvider metricsProvider, IgniteProductVersion ver) {
+        assert id != null;
+        assert !F.isEmpty(addrs);
+        assert metricsProvider != null;
+        assert ver != null;
+
+        this.id = id;
+        this.addrs = addrs;
+        this.hostNames = hostNames;
+        this.discPort = discPort;
+        this.metricsProvider = metricsProvider;
+        this.ver = ver;
+
+        consistentId = U.consistentId(addrs, discPort);
+
+        metrics = metricsProvider.getMetrics();
+        sockAddrs = U.toSocketAddresses(this, discPort);
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object consistentId() {
+        return consistentId;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> T attribute(String name) {
+        // Even though discovery SPI removes this attribute after authentication, keep this check for safety.
+        if (GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name))
+            return null;
+
+        return (T)attrs.get(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Object> attributes() {
+        // Even though discovery SPI removes this attribute after authentication, keep this check for safety.
+        return F.view(attrs, new IgnitePredicate<String>() {
+            @Override public boolean apply(String s) {
+                return !GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s);
+            }
+        });
+    }
+
+    /**
+     * Sets node attributes.
+     *
+     * @param attrs Node attributes.
+     */
+    public void setAttributes(Map<String, Object> attrs) {
+        this.attrs = U.sealMap(attrs);
+    }
+
+    /**
+     * Gets node attributes without filtering.
+     *
+     * @return Node attributes without filtering.
+     */
+    public Map<String, Object> getAttributes() {
+        return attrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNodeMetrics metrics() {
+        if (metricsProvider != null)
+            metrics = metricsProvider.getMetrics();
+
+        return metrics;
+    }
+
+    /**
+     * Sets node metrics.
+     *
+     * @param metrics Node metrics.
+     */
+    public void setMetrics(ClusterNodeMetrics metrics) {
+        assert metrics != null;
+
+        this.metrics = metrics;
+    }
+
+    /**
+     * @return Internal order.
+     */
+    public long internalOrder() {
+        return intOrder;
+    }
+
+    /**
+     * @param intOrder Internal order of the node.
+     */
+    public void internalOrder(long intOrder) {
+        assert intOrder > 0;
+
+        this.intOrder = intOrder;
+    }
+
+    /**
+     * @return Order.
+     */
+    @Override public long order() {
+        return order;
+    }
+
+    /**
+     * @param order Order of the node.
+     */
+    public void order(long order) {
+        assert order >= 0 : "Order is invalid: " + this;
+
+        this.order = order;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteProductVersion version() {
+        return ver;
+    }
+
+    /**
+     * @param ver Version.
+     */
+    public void version(IgniteProductVersion ver) {
+        assert ver != null;
+
+        this.ver = ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> addresses() {
+        return addrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLocal() {
+        return loc;
+    }
+
+    /**
+     * @param loc Grid local node flag.
+     */
+    public void local(boolean loc) {
+        this.loc = loc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDaemon() {
+        return "true".equalsIgnoreCase((String)attribute(ATTR_DAEMON));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> hostNames() {
+        return hostNames;
+    }
+
+    /**
+     * @return Discovery port.
+     */
+    public int discoveryPort() {
+        return discPort;
+    }
+
+    /**
+     * @return Addresses that could be used by discovery.
+     */
+    public Collection<InetSocketAddress> socketAddresses() {
+        return sockAddrs;
+    }
+
+    /**
+     * Gets node last update time.
+     *
+     * @return Time of the last heartbeat.
+     */
+    public long lastUpdateTime() {
+        return lastUpdateTime;
+    }
+
+    /**
+     * Sets node last update.
+     *
+     * @param lastUpdateTime Time of last metrics update.
+     */
+    public void lastUpdateTime(long lastUpdateTime) {
+        assert lastUpdateTime > 0;
+
+        this.lastUpdateTime = lastUpdateTime;
+    }
+
+    /**
+     * Gets visible flag.
+     *
+     * @return {@code true} if node is in visible state.
+     */
+    public boolean visible() {
+        return visible;
+    }
+
+    /**
+     * Sets visible flag.
+     *
+     * @param visible {@code true} if node is in visible state.
+     */
+    public void visible(boolean visible) {
+        this.visible = visible;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClient() {
+        return clientRouterNodeId != null;
+    }
+
+    /**
+     * Decrements alive check value and returns new one.
+     *
+     * @return Alive check value.
+     */
+    public int decrementAliveCheck() {
+        assert isClient();
+
+        return --aliveCheck;
+    }
+
+    /**
+     * @param aliveCheck Alive check value.
+     */
+    public void aliveCheck(int aliveCheck) {
+        assert isClient();
+
+        this.aliveCheck = aliveCheck;
+    }
+
+    /**
+     * @return Client router node ID.
+     */
+    public UUID clientRouterNodeId() {
+        return clientRouterNodeId;
+    }
+
+    /**
+     * @param clientRouterNodeId Client router node ID.
+     */
+    public void clientRouterNodeId(UUID clientRouterNodeId) {
+        this.clientRouterNodeId = clientRouterNodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(@Nullable TcpDiscoveryNode node) {
+        if (node == null)
+            return 1;
+
+        if (internalOrder() == node.internalOrder())
+            assert id().equals(node.id()) : "Duplicate order [this=" + this + ", other=" + node + ']';
+
+        return internalOrder() < node.internalOrder() ? -1 : internalOrder() > node.internalOrder() ? 1 :
+            id().compareTo(node.id());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeUuid(out, id);
+        U.writeMap(out, attrs);
+        U.writeCollection(out, addrs);
+        U.writeCollection(out, hostNames);
+        out.writeInt(discPort);
+
+        byte[] mtr = null;
+
+        if (metrics != null) {
+            mtr = new byte[DiscoveryMetricsHelper.METRICS_SIZE];
+
+            DiscoveryMetricsHelper.serialize(mtr, 0, metrics);
+        }
+
+        U.writeByteArray(out, mtr);
+
+        out.writeLong(order);
+        out.writeLong(intOrder);
+        out.writeObject(ver);
+        U.writeUuid(out, clientRouterNodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        id = U.readUuid(in);
+
+        attrs = U.sealMap(U.<String, Object>readMap(in));
+        addrs = U.readCollection(in);
+        hostNames = U.readCollection(in);
+        discPort = in.readInt();
+
+        sockAddrs = U.toSocketAddresses(this, discPort);
+
+        consistentId = U.consistentId(addrs, discPort);
+
+        byte[] mtr = U.readByteArray(in);
+
+        if (mtr != null)
+            metrics = DiscoveryMetricsHelper.deserialize(mtr, 0);
+
+        order = in.readLong();
+        intOrder = in.readLong();
+        ver = (IgniteProductVersion)in.readObject();
+        clientRouterNodeId = U.readUuid(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return id.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        return F.eqNodes(this, o);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
new file mode 100644
index 0000000..9c57f4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -0,0 +1,636 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp.internal;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Convenient way to represent topology for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
+ */
+public class TcpDiscoveryNodesRing {
+    /** Visible nodes filter. */
+    private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
+        @Override public boolean apply(TcpDiscoveryNode node) {
+            return node.visible();
+        }
+    };
+
+    /** Client nodes filter. */
+    private static final PN CLIENT_NODES = new PN() {
+        @Override public boolean apply(ClusterNode node) {
+            return node.isClient();
+        }
+    };
+
+    /** Local node. */
+    private TcpDiscoveryNode locNode;
+
+    /** All nodes in topology. */
+    @GridToStringInclude
+    private NavigableSet<TcpDiscoveryNode> nodes = new TreeSet<>();
+
+    /** All started nodes. */
+    @GridToStringExclude
+    private Map<UUID, TcpDiscoveryNode> nodesMap = new HashMap<>();
+
+    /** Current topology version */
+    private long topVer;
+
+    /** */
+    private long nodeOrder;
+
+    /** Lock. */
+    @GridToStringExclude
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    /**
+     * Sets local node.
+     *
+     * @param locNode Local node.
+     */
+    public void localNode(TcpDiscoveryNode locNode) {
+        assert locNode != null;
+
+        rwLock.writeLock().lock();
+
+        try {
+            this.locNode = locNode;
+
+            clear();
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Gets all nodes in the topology.
+     *
+     * @return Collection of all nodes.
+     */
+    public Collection<TcpDiscoveryNode> allNodes() {
+        return nodes();
+    }
+
+    /**
+     * Gets visible nodes in the topology.
+     *
+     * @return Collection of visible nodes.
+     */
+    public Collection<TcpDiscoveryNode> visibleNodes() {
+        return nodes(VISIBLE_NODES);
+    }
+
+    /**
+     * Gets remote nodes.
+     *
+     * @return Collection of remote nodes in grid.
+     */
+    public Collection<TcpDiscoveryNode> remoteNodes() {
+        return nodes(F.remoteNodes(locNode.id()));
+    }
+
+    /**
+     * Gets visible remote nodes in the topology.
+     *
+     * @return Collection of visible remote nodes.
+     */
+    public Collection<TcpDiscoveryNode> visibleRemoteNodes() {
+        return nodes(VISIBLE_NODES, F.remoteNodes(locNode.id()));
+    }
+
+    /**
+     * @return Client nodes.
+     */
+    public Collection<TcpDiscoveryNode> clientNodes() {
+        return nodes(CLIENT_NODES);
+    }
+
+    /**
+     * Checks whether the topology has remote nodes in.
+     *
+     * @return {@code true} if the topology has remote nodes in.
+     */
+    public boolean hasRemoteNodes() {
+        rwLock.readLock().lock();
+
+        try {
+            return nodes.size() > 1;
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Adds node to topology, also initializes node last update time with current
+     * system time.
+     *
+     * @param node Node to add.
+     * @return {@code true} if such node was added and did not present previously in the topology.
+     */
+    public boolean add(TcpDiscoveryNode node) {
+        assert node != null;
+        assert node.internalOrder() > 0;
+
+        rwLock.writeLock().lock();
+
+        try {
+            if (nodesMap.containsKey(node.id()))
+                return false;
+
+            assert node.internalOrder() > maxInternalOrder() : "Adding node to the middle of the ring " +
+                "[ring=" + this + ", node=" + node + ']';
+
+            nodesMap.put(node.id(), node);
+
+            nodes = new TreeSet<>(nodes);
+
+            node.lastUpdateTime(U.currentTimeMillis());
+
+            nodes.add(node);
+
+            nodeOrder = node.internalOrder();
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+
+        return true;
+    }
+
+    /**
+     * @return Max internal order.
+     */
+    public long maxInternalOrder() {
+        rwLock.readLock().lock();
+
+        try {
+            TcpDiscoveryNode last = nodes.last();
+
+            return last != null ? last.internalOrder() : -1;
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Restores topology from parameters values.
+     * <p>
+     * This method is called when new node receives topology from coordinator.
+     * In this case all nodes received are remote for local.
+     * <p>
+     * Also initializes nodes last update time with current system time.
+     *
+     * @param nodes List of remote nodes.
+     * @param topVer Topology version.
+     */
+    public void restoreTopology(Iterable<TcpDiscoveryNode> nodes, long topVer) {
+        assert !F.isEmpty(nodes);
+        assert topVer > 0;
+
+        rwLock.writeLock().lock();
+
+        try {
+            locNode.internalOrder(topVer);
+
+            clear();
+
+            boolean firstAdd = true;
+
+            for (TcpDiscoveryNode node : nodes) {
+                if (nodesMap.containsKey(node.id()))
+                    continue;
+
+                nodesMap.put(node.id(), node);
+
+                if (firstAdd) {
+                    this.nodes = new TreeSet<>(this.nodes);
+
+                    firstAdd = false;
+                }
+
+                node.lastUpdateTime(U.currentTimeMillis());
+
+                this.nodes.add(node);
+            }
+
+            nodeOrder = topVer;
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Finds node by ID.
+     *
+     * @param nodeId Node id to find.
+     * @return Node with ID provided or {@code null} if not found.
+     */
+    @Nullable public TcpDiscoveryNode node(UUID nodeId) {
+        assert nodeId != null;
+
+        rwLock.readLock().lock();
+
+        try {
+            return nodesMap.get(nodeId);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Removes node from the topology.
+     *
+     * @param nodeId ID of the node to remove.
+     * @return {@code true} if node was removed.
+     */
+    @Nullable public TcpDiscoveryNode removeNode(UUID nodeId) {
+        assert nodeId != null;
+        assert !locNode.id().equals(nodeId);
+
+        rwLock.writeLock().lock();
+
+        try {
+            TcpDiscoveryNode rmv = nodesMap.remove(nodeId);
+
+            if (rmv != null) {
+                nodes = new TreeSet<>(nodes);
+
+                nodes.remove(rmv);
+            }
+
+            return rmv;
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Removes nodes from the topology.
+     *
+     * @param nodeIds IDs of the nodes to remove.
+     * @return Collection of removed nodes.
+     */
+    public Collection<TcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) {
+        assert !F.isEmpty(nodeIds);
+
+        rwLock.writeLock().lock();
+
+        try {
+            boolean firstRmv = true;
+
+            Collection<TcpDiscoveryNode> res = null;
+
+            for (UUID id : nodeIds) {
+                TcpDiscoveryNode rmv = nodesMap.remove(id);
+
+                if (rmv != null) {
+                    if (firstRmv) {
+                        nodes = new TreeSet<>(nodes);
+
+                        res = new ArrayList<>(nodeIds.size());
+
+                        firstRmv = false;
+                    }
+
+                    nodes.remove(rmv);
+
+                    res.add(rmv);
+                }
+            }
+
+            return res == null ? Collections.<TcpDiscoveryNode>emptyList() : res;
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Removes all remote nodes, leaves only local node.
+     * <p>
+     * This should be called when SPI should be disconnected from topology and
+     * reconnected back after.
+     */
+    public void clear() {
+        rwLock.writeLock().lock();
+
+        try {
+            nodes = new TreeSet<>();
+
+            if (locNode != null)
+                nodes.add(locNode);
+
+            nodesMap = new HashMap<>();
+
+            if (locNode != null)
+                nodesMap.put(locNode.id(), locNode);
+
+            nodeOrder = 0;
+
+            topVer = 0;
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Finds coordinator in the topology.
+     *
+     * @return Coordinator node that gives versions to topology (node with the smallest order).
+     */
+    @Nullable public TcpDiscoveryNode coordinator() {
+        rwLock.readLock().lock();
+
+        try {
+            if (F.isEmpty(nodes))
+                return null;
+
+            return coordinator(null);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Finds coordinator in the topology filtering excluded nodes from the search.
+     * <p>
+     * This may be used when handling current coordinator leave or failure.
+     *
+     * @param excluded Nodes to exclude from the search (optional).
+     * @return Coordinator node among remaining nodes or {@code null} if all nodes are excluded.
+     */
+    @Nullable public TcpDiscoveryNode coordinator(@Nullable Collection<TcpDiscoveryNode> excluded) {
+        rwLock.readLock().lock();
+
+        try {
+            Collection<TcpDiscoveryNode> filtered = serverNodes(excluded);
+
+            if (F.isEmpty(filtered))
+                return null;
+
+            return Collections.min(filtered);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Finds next node in the topology.
+     *
+     * @return Next node.
+     */
+    @Nullable public TcpDiscoveryNode nextNode() {
+        rwLock.readLock().lock();
+
+        try {
+            if (nodes.size() < 2)
+                return null;
+
+            return nextNode(null);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Finds next node in the topology filtering excluded nodes from search.
+     * <p>
+     * This may be used when detecting and handling nodes failure.
+     *
+     * @param excluded Nodes to exclude from the search (optional). If provided,
+     * cannot contain local node.
+     * @return Next node or {@code null} if all nodes were filtered out or
+     * topology contains less than two nodes.
+     */
+    @Nullable public TcpDiscoveryNode nextNode(@Nullable Collection<TcpDiscoveryNode> excluded) {
+        assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode);
+
+        rwLock.readLock().lock();
+
+        try {
+            Collection<TcpDiscoveryNode> filtered = serverNodes(excluded);
+
+            if (filtered.size() < 2)
+                return null;
+
+            Iterator<TcpDiscoveryNode> iter = filtered.iterator();
+
+            while (iter.hasNext()) {
+                TcpDiscoveryNode node = iter.next();
+
+                if (locNode.equals(node))
+                    break;
+            }
+
+            return iter.hasNext() ? iter.next() : F.first(filtered);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Finds previous node in the topology.
+     *
+     * @return Previous node.
+     */
+    @Nullable public TcpDiscoveryNode previousNode() {
+        rwLock.readLock().lock();
+
+        try {
+            if (nodes.size() < 2)
+                return null;
+
+            return previousNode(null);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Finds previous node in the topology filtering excluded nodes from search.
+     *
+     * @param excluded Nodes to exclude from the search (optional). If provided,
+     * cannot contain local node.
+     * @return Previous node or {@code null} if all nodes were filtered out or
+     * topology contains less than two nodes.
+     */
+    @Nullable public TcpDiscoveryNode previousNode(@Nullable Collection<TcpDiscoveryNode> excluded) {
+        assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode);
+
+        rwLock.readLock().lock();
+
+        try {
+            Collection<TcpDiscoveryNode> filtered = serverNodes(excluded);
+
+            if (filtered.size() < 2)
+                return null;
+
+            Iterator<TcpDiscoveryNode> iter = filtered.iterator();
+
+            while (iter.hasNext()) {
+                TcpDiscoveryNode node = iter.next();
+
+                if (locNode.equals(node))
+                    break;
+            }
+
+            return iter.hasNext() ? iter.next() : F.first(filtered);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Gets current topology version.
+     *
+     * @return Current topology version.
+     */
+    public long topologyVersion() {
+        rwLock.readLock().lock();
+
+        try {
+            return topVer;
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Sets new topology version.
+     *
+     * @param topVer New topology version (should be greater than current, otherwise no-op).
+     * @return {@code True} if topology has been changed.
+     */
+    public boolean topologyVersion(long topVer) {
+        rwLock.writeLock().lock();
+
+        try {
+            if (this.topVer < topVer) {
+                this.topVer = topVer;
+
+                return true;
+            }
+
+            U.debug("KARAMBA [old=" + this.topVer + ", new=" + topVer + ']');
+
+            return false;
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Increments topology version and gets new value.
+     *
+     * @return Topology version (incremented).
+     */
+    public long incrementTopologyVersion() {
+        rwLock.writeLock().lock();
+
+        try {
+            return ++topVer;
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Increments topology version and gets new value.
+     *
+     * @return Topology version (incremented).
+     */
+    public long nextNodeOrder() {
+        rwLock.writeLock().lock();
+
+        try {
+            if (nodeOrder == 0) {
+                TcpDiscoveryNode last = nodes.last();
+
+                assert last != null;
+
+                nodeOrder = last.internalOrder();
+            }
+
+            return ++nodeOrder;
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @param p Filters.
+     * @return Unmodifiable collection of nodes.
+     */
+    private Collection<TcpDiscoveryNode> nodes(IgnitePredicate<? super TcpDiscoveryNode>... p) {
+        rwLock.readLock().lock();
+
+        try {
+            List<TcpDiscoveryNode> list = U.arrayList(nodes, p);
+
+            return Collections.unmodifiableCollection(list);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Gets server nodes from topology.
+     *
+     * @param excluded Nodes to exclude from the search (optional).
+     * @return Collection of server nodes.
+     */
+    private Collection<TcpDiscoveryNode> serverNodes(@Nullable final Collection<TcpDiscoveryNode> excluded) {
+        final boolean excludedEmpty = F.isEmpty(excluded);
+
+        return F.view(nodes, new P1<TcpDiscoveryNode>() {
+            @Override public boolean apply(TcpDiscoveryNode node) {
+                return !node.isClient() && (excludedEmpty || !excluded.contains(node));
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        rwLock.readLock().lock();
+
+        try {
+            return S.toString(TcpDiscoveryNodesRing.class, this);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java
new file mode 100644
index 0000000..693ec41
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java
@@ -0,0 +1,45 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp.internal;
+
+/**
+ * State of local node {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}.
+ */
+public enum TcpDiscoverySpiState {
+    /** */
+    DISCONNECTED,
+
+    /** */
+    CONNECTING,
+
+    /** */
+    CONNECTED,
+
+    /** */
+    DISCONNECTING,
+
+    /** */
+    STOPPING,
+
+    /** */
+    LEFT,
+
+    /** */
+    DUPLICATE_ID,
+
+    /** */
+    AUTH_FAILED,
+
+    /** */
+    CHECK_FAILED,
+
+    /** */
+    LOOPBACK_PROBLEM
+}