You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:34 UTC

[13/28] incubator-ignite git commit: ignite-545: merge from sprint-6

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
deleted file mode 100644
index 802da02..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ /dev/null
@@ -1,1160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.marshaller.jdk.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.discovery.*;
-import org.apache.ignite.spi.discovery.tcp.internal.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.messages.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
-
-/**
- * Base class for TCP discovery SPIs.
- */
-abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi {
-    /** Default port to listen (value is <tt>47500</tt>). */
-    public static final int DFLT_PORT = 47500;
-
-    /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
-    public static final long DFLT_SOCK_TIMEOUT = 200;
-
-    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */
-    public static final long DFLT_ACK_TIMEOUT = 50;
-
-    /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */
-    public static final long DFLT_NETWORK_TIMEOUT = 5000;
-
-    /** Default value for thread priority (value is <tt>10</tt>). */
-    public static final int DFLT_THREAD_PRI = 10;
-
-    /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */
-    public static final long DFLT_HEARTBEAT_FREQ = 100;
-
-    /** Default size of topology snapshots history. */
-    public static final int DFLT_TOP_HISTORY_SIZE = 1000;
-
-    /** Response OK. */
-    protected static final int RES_OK = 1;
-
-    /** Response CONTINUE JOIN. */
-    protected static final int RES_CONTINUE_JOIN = 100;
-
-    /** Response WAIT. */
-    protected static final int RES_WAIT = 200;
-
-    /** Local address. */
-    protected String locAddr;
-
-    /** IP finder. */
-    protected TcpDiscoveryIpFinder ipFinder;
-
-    /** Socket operations timeout. */
-    protected long sockTimeout = DFLT_SOCK_TIMEOUT;
-
-    /** Message acknowledgement timeout. */
-    protected long ackTimeout = DFLT_ACK_TIMEOUT;
-
-    /** Network timeout. */
-    protected long netTimeout = DFLT_NETWORK_TIMEOUT;
-
-    /** Thread priority for all threads started by SPI. */
-    protected int threadPri = DFLT_THREAD_PRI;
-
-    /** Heartbeat messages issuing frequency. */
-    protected long hbFreq = DFLT_HEARTBEAT_FREQ;
-
-    /** Size of topology snapshots history. */
-    protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
-
-    /** Grid discovery listener. */
-    protected volatile DiscoverySpiListener lsnr;
-
-    /** Data exchange. */
-    protected DiscoverySpiDataExchange exchange;
-
-    /** Metrics provider. */
-    protected DiscoveryMetricsProvider metricsProvider;
-
-    /** Local node attributes. */
-    protected Map<String, Object> locNodeAttrs;
-
-    /** Local node version. */
-    protected IgniteProductVersion locNodeVer;
-
-    /** Local node. */
-    protected TcpDiscoveryNode locNode;
-
-    /** Local host. */
-    protected InetAddress locHost;
-
-    /** Internal and external addresses of local node. */
-    protected Collection<InetSocketAddress> locNodeAddrs;
-
-    /** Socket timeout worker. */
-    protected SocketTimeoutWorker sockTimeoutWorker;
-
-    /** Discovery state. */
-    protected TcpDiscoverySpiState spiState = DISCONNECTED;
-
-    /** Start time of the very first grid node. */
-    protected volatile long gridStartTime;
-
-    /** Marshaller. */
-    protected final Marshaller marsh = new JdkMarshaller();
-
-    /** Statistics. */
-    protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics();
-
-    /** Logger. */
-    @LoggerResource
-    protected IgniteLogger log;
-
-    /**
-     * Inject resources
-     *
-     * @param ignite Ignite.
-     */
-    @IgniteInstanceResource
-    @Override protected void injectResources(Ignite ignite) {
-        super.injectResources(ignite);
-
-        // Inject resource.
-        if (ignite != null)
-            setLocalAddress(ignite.configuration().getLocalHost());
-    }
-
-    /**
-     * Sets local host IP address that discovery SPI uses.
-     * <p>
-     * If not provided, by default a first found non-loopback address
-     * will be used. If there is no non-loopback address available,
-     * then {@link InetAddress#getLocalHost()} will be used.
-     *
-     * @param locAddr IP address.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setLocalAddress(String locAddr) {
-        // Injection should not override value already set by Spring or user.
-        if (this.locAddr == null)
-            this.locAddr = locAddr;
-    }
-
-    /**
-     * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method.
-     *
-     * @return local address.
-     */
-    public String getLocalAddress() {
-        return locAddr;
-    }
-
-    /**
-     * Gets IP finder for IP addresses sharing and storing.
-     *
-     * @return IP finder for IP addresses sharing and storing.
-     */
-    public TcpDiscoveryIpFinder getIpFinder() {
-        return ipFinder;
-    }
-
-    /**
-     * Sets IP finder for IP addresses sharing and storing.
-     * <p>
-     * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
-     *
-     * @param ipFinder IP finder.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setIpFinder(TcpDiscoveryIpFinder ipFinder) {
-        this.ipFinder = ipFinder;
-    }
-
-    /**
-     * Sets socket operations timeout. This timeout is used to limit connection time and
-     * write-to-socket time.
-     * <p>
-     * Note that when running Ignite on Amazon EC2, socket timeout must be set to a value
-     * significantly greater than the default (e.g. to {@code 30000}).
-     * <p>
-     * If not specified, default is {@link #DFLT_SOCK_TIMEOUT}.
-     *
-     * @param sockTimeout Socket connection timeout.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setSocketTimeout(long sockTimeout) {
-        this.sockTimeout = sockTimeout;
-    }
-
-    /**
-     * Sets timeout for receiving acknowledgement for sent message.
-     * <p>
-     * If acknowledgement is not received within this timeout, sending is considered as failed
-     * and SPI tries to repeat message sending.
-     * <p>
-     * If not specified, default is {@link #DFLT_ACK_TIMEOUT}.
-     *
-     * @param ackTimeout Acknowledgement timeout.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setAckTimeout(long ackTimeout) {
-        this.ackTimeout = ackTimeout;
-    }
-
-    /**
-     * Sets maximum network timeout to use for network operations.
-     * <p>
-     * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}.
-     *
-     * @param netTimeout Network timeout.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setNetworkTimeout(long netTimeout) {
-        this.netTimeout = netTimeout;
-    }
-
-    /**
-     * Sets thread priority. All threads within SPI will be started with it.
-     * <p>
-     * If not provided, default value is {@link #DFLT_THREAD_PRI}
-     *
-     * @param threadPri Thread priority.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setThreadPriority(int threadPri) {
-        this.threadPri = threadPri;
-    }
-
-    /**
-     * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
-     * in configurable time interval to other nodes to notify them about its state.
-     * <p>
-     * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
-     *
-     * @param hbFreq Heartbeat frequency in milliseconds.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setHeartbeatFrequency(long hbFreq) {
-        this.hbFreq = hbFreq;
-    }
-
-    /**
-     * @return Size of topology snapshots history.
-     */
-    public long getTopHistorySize() {
-        return topHistSize;
-    }
-
-    /**
-     * Sets size of topology snapshots history. Specified size should be greater than or equal to default size
-     * {@link #DFLT_TOP_HISTORY_SIZE}.
-     *
-     * @param topHistSize Size of topology snapshots history.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setTopHistorySize(int topHistSize) {
-        if (topHistSize < DFLT_TOP_HISTORY_SIZE) {
-            U.warn(log, "Topology history size should be greater than or equal to default size. " +
-                "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize +
-                ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']');
-
-            return;
-        }
-
-        this.topHistSize = topHistSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
-        assert locNodeAttrs == null;
-        assert locNodeVer == null;
-
-        if (log.isDebugEnabled()) {
-            log.debug("Node attributes to set: " + attrs);
-            log.debug("Node version to set: " + ver);
-        }
-
-        locNodeAttrs = attrs;
-        locNodeVer = ver;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
-        super.onContextInitialized0(spiCtx);
-
-        ipFinder.onSpiContextInitialized(spiCtx);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onContextDestroyed0() {
-        super.onContextDestroyed0();
-
-        if (ipFinder != null)
-            ipFinder.onSpiContextDestroyed();
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterNode getLocalNode() {
-        return locNode;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
-        this.lsnr = lsnr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
-        this.exchange = exchange;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
-        this.metricsProvider = metricsProvider;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getGridStartTime() {
-        assert gridStartTime != 0;
-
-        return gridStartTime;
-    }
-
-    /**
-     * @param sockAddr Remote address.
-     * @return Opened socket.
-     * @throws IOException If failed.
-     */
-    protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
-        assert sockAddr != null;
-
-        InetSocketAddress resolved = sockAddr.isUnresolved() ?
-            new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr;
-
-        InetAddress addr = resolved.getAddress();
-
-        assert addr != null;
-
-        Socket sock = new Socket();
-
-        sock.bind(new InetSocketAddress(locHost, 0));
-
-        sock.setTcpNoDelay(true);
-
-        sock.connect(resolved, (int)sockTimeout);
-
-        writeToSocket(sock, U.IGNITE_HEADER);
-
-        return sock;
-    }
-
-    /**
-     * Writes message to the socket.
-     *
-     * @param sock Socket.
-     * @param data Raw data to write.
-     * @throws IOException If IO failed or write timed out.
-     */
-    @SuppressWarnings("ThrowFromFinallyBlock")
-    protected void writeToSocket(Socket sock, byte[] data) throws IOException {
-        assert sock != null;
-        assert data != null;
-
-        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
-
-        sockTimeoutWorker.addTimeoutObject(obj);
-
-        IOException err = null;
-
-        try {
-            OutputStream out = sock.getOutputStream();
-
-            out.write(data);
-
-            out.flush();
-        }
-        catch (IOException e) {
-            err = e;
-        }
-        finally {
-            boolean cancelled = obj.cancel();
-
-            if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
-
-            // Throw original exception.
-            if (err != null)
-                throw err;
-
-            if (!cancelled)
-                throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
-        }
-    }
-
-    /**
-     * Writes message to the socket.
-     *
-     * @param sock Socket.
-     * @param msg Message.
-     * @throws IOException If IO failed or write timed out.
-     * @throws IgniteCheckedException If marshalling failed.
-     */
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException {
-        writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K.
-    }
-
-    /**
-     * Writes message to the socket.
-     *
-     * @param sock Socket.
-     * @param msg Message.
-     * @param bout Byte array output stream.
-     * @throws IOException If IO failed or write timed out.
-     * @throws IgniteCheckedException If marshalling failed.
-     */
-    @SuppressWarnings("ThrowFromFinallyBlock")
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout)
-        throws IOException, IgniteCheckedException {
-        assert sock != null;
-        assert msg != null;
-        assert bout != null;
-
-        // Marshall message first to perform only write after.
-        marsh.marshal(msg, bout);
-
-        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
-
-        sockTimeoutWorker.addTimeoutObject(obj);
-
-        IOException err = null;
-
-        try {
-            OutputStream out = sock.getOutputStream();
-
-            bout.writeTo(out);
-
-            out.flush();
-        }
-        catch (IOException e) {
-            err = e;
-        }
-        finally {
-            boolean cancelled = obj.cancel();
-
-            if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
-
-            // Throw original exception.
-            if (err != null)
-                throw err;
-
-            if (!cancelled)
-                throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
-        }
-    }
-
-    /**
-     * Writes response to the socket.
-     *
-     * @param sock Socket.
-     * @param res Integer response.
-     * @throws IOException If IO failed or write timed out.
-     */
-    @SuppressWarnings("ThrowFromFinallyBlock")
-    protected void writeToSocket(Socket sock, int res) throws IOException {
-        assert sock != null;
-
-        SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
-
-        sockTimeoutWorker.addTimeoutObject(obj);
-
-        OutputStream out = sock.getOutputStream();
-
-        IOException err = null;
-
-        try {
-            out.write(res);
-
-            out.flush();
-        }
-        catch (IOException e) {
-            err = e;
-        }
-        finally {
-            boolean cancelled = obj.cancel();
-
-            if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
-
-            // Throw original exception.
-            if (err != null)
-                throw err;
-
-            if (!cancelled)
-                throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
-        }
-    }
-
-    /**
-     * Reads message from the socket limiting read time.
-     *
-     * @param sock Socket.
-     * @param in Input stream (in case socket stream was wrapped).
-     * @param timeout Socket timeout for this operation.
-     * @return Message.
-     * @throws IOException If IO failed or read timed out.
-     * @throws IgniteCheckedException If unmarshalling failed.
-     */
-    protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException {
-        assert sock != null;
-
-        int oldTimeout = sock.getSoTimeout();
-
-        try {
-            sock.setSoTimeout((int)timeout);
-
-            return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
-        }
-        catch (IOException | IgniteCheckedException e) {
-            if (X.hasCause(e, SocketTimeoutException.class))
-                LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " +
-                    "in long GC pauses on remote node. Current timeout: " + timeout + '.');
-
-            throw e;
-        }
-        finally {
-            // Quietly restore timeout.
-            try {
-                sock.setSoTimeout(oldTimeout);
-            }
-            catch (SocketException ignored) {
-                // No-op.
-            }
-        }
-    }
-
-    /**
-     * Reads message delivery receipt from the socket.
-     *
-     * @param sock Socket.
-     * @param timeout Socket timeout for this operation.
-     * @return Receipt.
-     * @throws IOException If IO failed or read timed out.
-     */
-    protected int readReceipt(Socket sock, long timeout) throws IOException {
-        assert sock != null;
-
-        int oldTimeout = sock.getSoTimeout();
-
-        try {
-            sock.setSoTimeout((int)timeout);
-
-            int res = sock.getInputStream().read();
-
-            if (res == -1)
-                throw new EOFException();
-
-            return res;
-        }
-        catch (SocketTimeoutException e) {
-            LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " +
-                "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " +
-                "configuration property). Will retry to send message with increased timeout. " +
-                "Current timeout: " + timeout + '.');
-
-            stats.onAckTimeout();
-
-            throw e;
-        }
-        finally {
-            // Quietly restore timeout.
-            try {
-                sock.setSoTimeout(oldTimeout);
-            }
-            catch (SocketException ignored) {
-                // No-op.
-            }
-        }
-    }
-
-    /**
-     * Resolves addresses registered in the IP finder, removes duplicates and local host
-     * address and returns the collection of.
-     *
-     * @return Resolved addresses without duplicates and local address (potentially
-     *      empty but never null).
-     * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
-     */
-    protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
-        List<InetSocketAddress> res = new ArrayList<>();
-
-        Collection<InetSocketAddress> addrs;
-
-        // Get consistent addresses collection.
-        while (true) {
-            try {
-                addrs = registeredAddresses();
-
-                break;
-            }
-            catch (IgniteSpiException e) {
-                LT.error(log, e, "Failed to get registered addresses from IP finder on start " +
-                    "(retrying every 2000 ms).");
-            }
-
-            try {
-                U.sleep(2000);
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw new IgniteSpiException("Thread has been interrupted.", e);
-            }
-        }
-
-        for (InetSocketAddress addr : addrs) {
-            assert addr != null;
-
-            try {
-                InetSocketAddress resolved = addr.isUnresolved() ?
-                    new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr;
-
-                if (locNodeAddrs == null || !locNodeAddrs.contains(resolved))
-                    res.add(resolved);
-            }
-            catch (UnknownHostException ignored) {
-                LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr);
-
-                // Add address in any case.
-                res.add(addr);
-            }
-        }
-
-        if (!res.isEmpty())
-            Collections.shuffle(res);
-
-        return res;
-    }
-
-    /**
-     * Gets addresses registered in the IP finder, initializes addresses having no
-     * port (or 0 port) with {@link #DFLT_PORT}.
-     *
-     * @return Registered addresses.
-     * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
-     */
-    protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException {
-        Collection<InetSocketAddress> res = new ArrayList<>();
-
-        for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) {
-            if (addr.getPort() == 0) {
-                // TcpDiscoveryNode.discoveryPort() returns an correct port for a server node and 0 for client node.
-                int port = locNode.discoveryPort() != 0 ? locNode.discoveryPort() : DFLT_PORT;
-
-                addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), port) :
-                    new InetSocketAddress(addr.getAddress(), port);
-            }
-
-            res.add(addr);
-        }
-
-        return res;
-    }
-
-    /**
-     * @param msg Message.
-     * @return Error.
-     */
-    protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) {
-        assert msg != null;
-
-        return new IgniteSpiException("Local node has the same ID as existing node in topology " +
-            "(fix configuration and restart local node) [localNode=" + locNode +
-            ", existingNode=" + msg.node() + ']');
-    }
-
-    /**
-     * @param msg Message.
-     * @return Error.
-     */
-    protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) {
-        assert msg != null;
-
-        return new IgniteSpiException(new IgniteAuthenticationException("Authentication failed [nodeId=" +
-            msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']'));
-    }
-
-    /**
-     * @param msg Message.
-     * @return Error.
-     */
-    protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) {
-        assert msg != null;
-
-        return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) :
-            new IgniteSpiException(msg.error());
-    }
-
-    /**
-     * @param msg Message.
-     * @return Whether delivery of the message is ensured.
-     */
-    protected boolean ensured(TcpDiscoveryAbstractMessage msg) {
-        return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null;
-    }
-
-    /**
-     * @param msg Failed message.
-     * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise.
-     * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it
-     *      and create separate message for failed version check with next major release.
-     */
-    @Deprecated
-    private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) {
-        return msg.error().contains("versions are not compatible");
-    }
-
-    /**
-     * @param joiningNodeID Joining node ID.
-     * @param nodeId Remote node ID for which data is provided.
-     * @param data Collection of marshalled discovery data objects from different components.
-     * @param clsLdr Class loader for discovery data unmarshalling.
-     */
-    protected void onExchange(UUID joiningNodeID,
-        UUID nodeId,
-        Map<Integer, byte[]> data,
-        ClassLoader clsLdr)
-    {
-        Map<Integer, Serializable> data0 = U.newHashMap(data.size());
-
-        for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
-            try {
-                Serializable compData = marsh.unmarshal(entry.getValue(), clsLdr);
-
-                data0.put(entry.getKey(), compData);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal discovery data for component: "  + entry.getKey(), e);
-            }
-        }
-
-        exchange.onExchange(joiningNodeID, nodeId, data0);
-    }
-
-    /**
-     * Handles sockets timeouts.
-     */
-    protected class SocketTimeoutWorker extends IgniteSpiThread {
-        /** Time-based sorted set for timeout objects. */
-        private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs =
-            new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() {
-                @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) {
-                    long time1 = o1.endTime();
-                    long time2 = o2.endTime();
-
-                    long id1 = o1.id();
-                    long id2 = o2.id();
-
-                    return time1 < time2 ? -1 : time1 > time2 ? 1 :
-                        id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
-                }
-            });
-
-        /** Mutex. */
-        private final Object mux0 = new Object();
-
-        /**
-         *
-         */
-        SocketTimeoutWorker() {
-            super(gridName, "tcp-disco-sock-timeout-worker", log);
-
-            setPriority(threadPri);
-        }
-
-        /**
-         * @param timeoutObj Timeout object to add.
-         */
-        @SuppressWarnings({"NakedNotify"})
-        public void addTimeoutObject(SocketTimeoutObject timeoutObj) {
-            assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
-
-            timeoutObjs.add(timeoutObj);
-
-            if (timeoutObjs.firstx() == timeoutObj) {
-                synchronized (mux0) {
-                    mux0.notifyAll();
-                }
-            }
-        }
-
-        /**
-         * @param timeoutObj Timeout object to remove.
-         */
-        public void removeTimeoutObject(SocketTimeoutObject timeoutObj) {
-            assert timeoutObj != null;
-
-            timeoutObjs.remove(timeoutObj);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Socket timeout worker has been started.");
-
-            while (!isInterrupted()) {
-                long now = U.currentTimeMillis();
-
-                for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
-                    SocketTimeoutObject timeoutObj = iter.next();
-
-                    if (timeoutObj.endTime() <= now) {
-                        iter.remove();
-
-                        if (timeoutObj.onTimeout()) {
-                            LT.warn(log, null, "Socket write has timed out (consider increasing " +
-                                "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
-
-                            stats.onSocketTimeout();
-                        }
-                    }
-                    else
-                        break;
-                }
-
-                synchronized (mux0) {
-                    while (true) {
-                        // Access of the first element must be inside of
-                        // synchronization block, so we don't miss out
-                        // on thread notification events sent from
-                        // 'addTimeoutObject(..)' method.
-                        SocketTimeoutObject first = timeoutObjs.firstx();
-
-                        if (first != null) {
-                            long waitTime = first.endTime() - U.currentTimeMillis();
-
-                            if (waitTime > 0)
-                                mux0.wait(waitTime);
-                            else
-                                break;
-                        }
-                        else
-                            mux0.wait(5000);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Socket timeout object.
-     */
-    private static class SocketTimeoutObject {
-        /** */
-        private static final AtomicLong idGen = new AtomicLong();
-
-        /** */
-        private final long id = idGen.incrementAndGet();
-
-        /** */
-        private final Socket sock;
-
-        /** */
-        private final long endTime;
-
-        /** */
-        private final AtomicBoolean done = new AtomicBoolean();
-
-        /**
-         * @param sock Socket.
-         * @param endTime End time.
-         */
-        SocketTimeoutObject(Socket sock, long endTime) {
-            assert sock != null;
-            assert endTime > 0;
-
-            this.sock = sock;
-            this.endTime = endTime;
-        }
-
-        /**
-         * @return {@code True} if object has not yet been processed.
-         */
-        boolean cancel() {
-            return done.compareAndSet(false, true);
-        }
-
-        /**
-         * @return {@code True} if object has not yet been canceled.
-         */
-        boolean onTimeout() {
-            if (done.compareAndSet(false, true)) {
-                // Close socket - timeout occurred.
-                U.closeQuiet(sock);
-
-                return true;
-            }
-
-            return false;
-        }
-
-        /**
-         * @return End time.
-         */
-        long endTime() {
-            return endTime;
-        }
-
-        /**
-         * @return ID.
-         */
-        long id() {
-            return id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SocketTimeoutObject.class, this);
-        }
-    }
-
-    /**
-     * Base class for message workers.
-     */
-    protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
-        /** Pre-allocated output stream (100K). */
-        private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024);
-
-        /** Message queue. */
-        private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();
-
-        /** Backed interrupted flag. */
-        private volatile boolean interrupted;
-
-        /**
-         * @param name Thread name.
-         */
-        protected MessageWorkerAdapter(String name) {
-            super(gridName, name, log);
-
-            setPriority(threadPri);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']');
-
-            while (!isInterrupted()) {
-                TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
-
-                if (msg == null)
-                    continue;
-
-                processMessage(msg);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void interrupt() {
-            interrupted = true;
-
-            super.interrupt();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isInterrupted() {
-            return interrupted || super.isInterrupted();
-        }
-
-        /**
-         * @return Current queue size.
-         */
-        int queueSize() {
-            return queue.size();
-        }
-
-        /**
-         * Adds message to queue.
-         *
-         * @param msg Message to add.
-         */
-        void addMessage(TcpDiscoveryAbstractMessage msg) {
-            assert msg != null;
-
-            if (msg instanceof TcpDiscoveryHeartbeatMessage)
-                queue.addFirst(msg);
-            else
-                queue.add(msg);
-
-            if (log.isDebugEnabled())
-                log.debug("Message has been added to queue: " + msg);
-        }
-
-        /**
-         * @param msg Message.
-         */
-        protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
-
-        /**
-         * @param sock Socket.
-         * @param msg Message.
-         * @throws IOException If IO failed.
-         * @throws IgniteCheckedException If marshalling failed.
-         */
-        protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
-            throws IOException, IgniteCheckedException {
-            bout.reset();
-
-            TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout);
-        }
-    }
-
-    /**
-     *
-     */
-    protected class SocketMultiConnector implements AutoCloseable {
-        /** */
-        private int connInProgress;
-
-        /** */
-        private final ExecutorService executor;
-
-        /** */
-        private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc;
-
-        /**
-         * @param addrs Addresses.
-         * @param retryCnt Retry count.
-         */
-        public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) {
-            connInProgress = addrs.size();
-
-            executor = Executors.newFixedThreadPool(Math.min(1, addrs.size()));
-
-            completionSrvc = new ExecutorCompletionService<>(executor);
-
-            for (final InetSocketAddress addr : addrs) {
-                completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() {
-                    @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() {
-                        Exception ex = null;
-                        Socket sock = null;
-
-                        for (int i = 0; i < retryCnt; i++) {
-                            if (Thread.currentThread().isInterrupted())
-                                return null; // Executor is shutdown.
-
-                            try {
-                                sock = openSocket(addr);
-
-                                break;
-                            }
-                            catch (Exception e) {
-                                ex = e;
-                            }
-                        }
-
-                        return new GridTuple3<>(addr, sock, ex);
-                    }
-                });
-            }
-        }
-
-        /**
-         *
-         */
-        @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() {
-            if (connInProgress == 0)
-                return null;
-
-            try {
-                Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take();
-
-                connInProgress--;
-
-                return fut.get();
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new IgniteSpiException("Thread has been interrupted.", e);
-            }
-            catch (ExecutionException e) {
-                throw new IgniteSpiException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() {
-            List<Runnable> unstartedTasks = executor.shutdownNow();
-
-            connInProgress -= unstartedTasks.size();
-
-            if (connInProgress > 0) {
-                Thread thread = new Thread(new Runnable() {
-                    @Override public void run() {
-                        try {
-                            executor.awaitTermination(5, TimeUnit.MINUTES);
-
-                            Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut;
-
-                            while ((fut = completionSrvc.poll()) != null) {
-                                try {
-                                    GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get();
-
-                                    if (tuple3 != null)
-                                        IgniteUtils.closeQuiet(tuple3.get2());
-                                }
-                                catch (ExecutionException ignore) {
-
-                                }
-                            }
-                        }
-                        catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-
-                            throw new RuntimeException(e);
-                        }
-                    }
-                });
-
-                thread.setDaemon(true);
-
-                thread.start();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
index df9d0f4..f338fab 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
@@ -272,4 +272,13 @@ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean {
      */
     @MXBeanDescription("Dump debug info.")
     public void dumpDebugInfo();
+
+    /**
+     * Whether or not discovery is started in client mode.
+     *
+     * @return {@code true} if node is in client mode.
+     * @throws IllegalStateException If discovery SPI is not started.
+     */
+    @MXBeanDescription("Client mode.")
+    public boolean isClientMode() throws IllegalStateException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index bb8f051..cc61c9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -448,11 +448,8 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
 
         ClusterMetrics metrics = this.metrics;
 
-        if (metrics != null) {
-            mtr = new byte[ClusterMetricsSnapshot.METRICS_SIZE];
-
-            ClusterMetricsSnapshot.serialize(mtr, 0, metrics);
-        }
+        if (metrics != null)
+            mtr = ClusterMetricsSnapshot.serialize(metrics);
 
         U.writeByteArray(out, mtr);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index e866504..e9eaa1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -32,7 +32,7 @@ import java.util.concurrent.locks.*;
  */
 public class TcpDiscoveryNodesRing {
     /** Visible nodes filter. */
-    private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
+    public static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
         @Override public boolean apply(TcpDiscoveryNode node) {
             return node.visible();
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
index 51ad7b4..95758e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java
@@ -31,7 +31,7 @@ public interface TcpDiscoveryIpFinder {
      * method is completed, SPI context can be stored for future access.
      *
      * @param spiCtx Spi context.
-     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     * @throws IgniteSpiException In case of error.
      */
     public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException;
 
@@ -46,7 +46,7 @@ public interface TcpDiscoveryIpFinder {
      * Initializes addresses discovery SPI binds to.
      *
      * @param addrs Addresses discovery SPI binds to.
-     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     * @throws IgniteSpiException In case of error.
      */
     public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
 
@@ -54,7 +54,7 @@ public interface TcpDiscoveryIpFinder {
      * Gets all addresses registered in this finder.
      *
      * @return All known addresses, potentially empty, but never {@code null}.
-     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     * @throws IgniteSpiException In case of error.
      */
     public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException;
 
@@ -76,7 +76,7 @@ public interface TcpDiscoveryIpFinder {
      * is already registered.
      *
      * @param addrs Addresses to register. Not {@code null} and not empty.
-     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     * @throws IgniteSpiException In case of error.
      */
     public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
 
@@ -87,7 +87,7 @@ public interface TcpDiscoveryIpFinder {
      * registered quietly (just no-op).
      *
      * @param addrs Addresses to unregister. Not {@code null} and not empty.
-     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     * @throws IgniteSpiException In case of error.
      */
     public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index 6cf06ab..a992620 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -26,6 +26,8 @@ import org.apache.ignite.marshaller.*;
 import org.apache.ignite.marshaller.jdk.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.jetbrains.annotations.*;
 
@@ -254,6 +256,20 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
                 "(it is recommended in production to specify at least one address in " +
                 "TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)");
 
+        boolean clientMode;
+
+        if (ignite != null) { // Can be null if used in tests without starting Ignite.
+            DiscoverySpi discoSpi = ignite.configuration().getDiscoverySpi();
+
+            if (!(discoSpi instanceof TcpDiscoverySpi))
+                throw new IgniteSpiException("TcpDiscoveryMulticastIpFinder should be used with " +
+                    "TcpDiscoverySpi: " + discoSpi);
+
+            clientMode = ((TcpDiscoverySpi)discoSpi).isClientMode();
+        }
+        else
+            clientMode = false;
+
         InetAddress mcastAddr;
 
         try {
@@ -296,7 +312,8 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
 
             if (!addr.isLoopbackAddress()) {
                 try {
-                    addrSnds.add(new AddressSender(mcastAddr, addr, addrs));
+                    if (!clientMode)
+                        addrSnds.add(new AddressSender(mcastAddr, addr, addrs));
 
                     reqItfs.add(addr);
                 }
@@ -309,20 +326,24 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
             }
         }
 
-        if (addrSnds.isEmpty()) {
-            try {
-                // Create non-bound socket if local host is loopback or failed to create sockets explicitly
-                // bound to interfaces.
-                addrSnds.add(new AddressSender(mcastAddr, null, addrs));
-            }
-            catch (IOException e) {
-                throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr +
-                    ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e);
+        if (!clientMode) {
+            if (addrSnds.isEmpty()) {
+                try {
+                    // Create non-bound socket if local host is loopback or failed to create sockets explicitly
+                    // bound to interfaces.
+                    addrSnds.add(new AddressSender(mcastAddr, null, addrs));
+                }
+                catch (IOException e) {
+                    throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr +
+                        ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e);
+                }
             }
-        }
 
-        for (AddressSender addrSnd :addrSnds)
-            addrSnd.start();
+            for (AddressSender addrSnd : addrSnds)
+                addrSnd.start();
+        }
+        else
+            assert addrSnds.isEmpty() : addrSnds;
 
         Collection<InetSocketAddress> ret;
 
@@ -495,11 +516,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
 
     /** {@inheritDoc} */
     @Override public void close() {
-        for (AddressSender addrSnd : addrSnds)
-            U.interrupt(addrSnd);
+        if (addrSnds != null) {
+            for (AddressSender addrSnd : addrSnds)
+                U.interrupt(addrSnd);
 
-        for (AddressSender addrSnd : addrSnds)
-            U.join(addrSnd, log);
+            for (AddressSender addrSnd : addrSnds)
+                U.join(addrSnd, log);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 1a00359..145b518 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -52,9 +52,6 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
     /** Topology version. */
     private long topVer;
 
-    /** Destination client node ID. */
-    private UUID destClientNodeId;
-
     /** Flags. */
     @GridToStringExclude
     private int flags;
@@ -178,20 +175,6 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
     }
 
     /**
-     * @return Destination client node ID.
-     */
-    public UUID destinationClientNodeId() {
-        return destClientNodeId;
-    }
-
-    /**
-     * @param destClientNodeId Destination client node ID.
-     */
-    public void destinationClientNodeId(UUID destClientNodeId) {
-        this.destClientNodeId = destClientNodeId;
-    }
-
-    /**
      * @return Pending message index.
      */
     public short pendingIndex() {
@@ -232,6 +215,13 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
             flags &= ~mask;
     }
 
+    /**
+     * @return {@code true} if message must be added to head of queue.
+     */
+    public boolean highPriority() {
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override public final boolean equals(Object obj) {
         if (this == obj)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
new file mode 100644
index 0000000..95ac340
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Heartbeat message.
+ * <p>
+ * Client sends his hearbeats in this message.
+ */
+public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final byte[] metrics;
+
+    /**
+     * Constructor.
+     *
+     * @param creatorNodeId Creator node.
+     */
+    public TcpDiscoveryClientHeartbeatMessage(UUID creatorNodeId, ClusterMetrics metrics) {
+        super(creatorNodeId);
+
+        this.metrics = ClusterMetricsSnapshot.serialize(metrics);
+    }
+
+    /**
+     * Gets metrics map.
+     *
+     * @return Metrics map.
+     */
+    public ClusterMetrics metrics() {
+        return ClusterMetricsSnapshot.deserialize(metrics, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean highPriority() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryClientHeartbeatMessage.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
new file mode 100644
index 0000000..f9f164d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryClientPingRequest extends TcpDiscoveryAbstractMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Pinged client node ID. */
+    private final UUID nodeToPing;
+
+    /**
+     * @param creatorNodeId Creator node ID.
+     * @param nodeToPing Pinged client node ID.
+     */
+    public TcpDiscoveryClientPingRequest(UUID creatorNodeId, @Nullable UUID nodeToPing) {
+        super(creatorNodeId);
+
+        this.nodeToPing = nodeToPing;
+    }
+
+    /**
+     * @return Pinged client node ID.
+     */
+    @Nullable public UUID nodeToPing() {
+        return nodeToPing;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryClientPingRequest.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
new file mode 100644
index 0000000..26a2b00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryClientPingResponse extends TcpDiscoveryAbstractMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Pinged client node ID. */
+    private final UUID nodeToPing;
+
+    /** */
+    private final boolean res;
+
+    /**
+     * @param creatorNodeId Creator node ID.
+     * @param nodeToPing Pinged client node ID.
+     */
+    public TcpDiscoveryClientPingResponse(UUID creatorNodeId, @Nullable UUID nodeToPing, boolean res) {
+        super(creatorNodeId);
+
+        this.nodeToPing = nodeToPing;
+        this.res = res;
+    }
+
+    /**
+     * @return Pinged client node ID.
+     */
+    @Nullable public UUID nodeToPing() {
+        return nodeToPing;
+    }
+
+    /**
+     * @return Result of ping.
+     */
+    public boolean result() {
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryClientPingResponse.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 4e42f2d..0739c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -18,29 +18,34 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.spi.discovery.*;
+import org.jetbrains.annotations.*;
 
-import java.io.*;
 import java.util.*;
 
 /**
  * Wrapped for custom message.
  */
+@TcpDiscoveryRedirectToClient
 @TcpDiscoveryEnsureDelivery
 public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** */
-    private transient Serializable msg;
+    private transient volatile DiscoverySpiCustomMessage msg;
 
     /** */
-    private final byte[] msgBytes;
+    private byte[] msgBytes;
 
     /**
      * @param creatorNodeId Creator node id.
+     * @param msg Message.
      * @param msgBytes Serialized message.
      */
-    public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, Serializable msg, byte[] msgBytes) {
+    public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg,
+        @NotNull byte[] msgBytes) {
         super(creatorNodeId);
 
         this.msg = msg;
@@ -48,17 +53,33 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
     }
 
     /**
-     * @return Message.
+     * @return Serialized message.
      */
-    public Serializable message() {
-        return msg;
+    public byte[] messageBytes() {
+        return msgBytes;
     }
 
     /**
-     * @return Serialized message.
+     * @param msg Message.
+     * @param msgBytes Serialized message.
      */
-    public byte[] messageBytes() {
-        return msgBytes;
+    public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msgBytes) {
+        this.msg = msg;
+        this.msgBytes = msgBytes;
+    }
+
+    /**
+     * @return Deserialized message,
+     * @throws java.lang.Throwable if unmarshal failed.
+     */
+    @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh) throws Throwable {
+        if (msg == null) {
+            msg = marsh.unmarshal(msgBytes, U.gridClassLoader());
+
+            assert msg != null;
+        }
+
+        return msg;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
index bafde9f..f721401 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
@@ -58,13 +58,6 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
     private final Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics = new HashMap<>();
 
     /**
-     * Public default no-arg constructor for {@link Externalizable} interface.
-     */
-    public TcpDiscoveryHeartbeatMessage() {
-        // No-op.
-    }
-
-    /**
      * Constructor.
      *
      * @param creatorNodeId Creator node.
@@ -211,22 +204,13 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(TcpDiscoveryHeartbeatMessage.class, this, "super", super.toString());
+    @Override public boolean highPriority() {
+        return true;
     }
 
-    /**
-     * @param metrics Metrics.
-     * @return Serialized metrics.
-     */
-    private static byte[] serializeMetrics(ClusterMetrics metrics) {
-        assert metrics != null;
-
-        byte[] buf = new byte[ClusterMetricsSnapshot.METRICS_SIZE];
-
-        ClusterMetricsSnapshot.serialize(buf, 0, metrics);
-
-        return buf;
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryHeartbeatMessage.class, this, "super", super.toString());
     }
 
     /**
@@ -273,7 +257,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
         public MetricsSet(ClusterMetrics metrics) {
             assert metrics != null;
 
-            this.metrics = serializeMetrics(metrics);
+            this.metrics = ClusterMetricsSnapshot.serialize(metrics);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 5a71eb3..1d974e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 
 import java.util.*;
 
@@ -34,6 +36,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
     private final UUID nodeId;
 
     /**
+     * Client node can not get discovery data from TcpDiscoveryNodeAddedMessage, we have to pass discovery data in
+     * TcpDiscoveryNodeAddFinishedMessage
+     */
+    @GridToStringExclude
+    private Map<UUID, Map<Integer, byte[]>> clientDiscoData;
+
+    /** */
+    @GridToStringExclude
+    private Map<String, Object> clientNodeAttrs;
+
+    /**
      * Constructor.
      *
      * @param creatorNodeId ID of the creator node (coordinator).
@@ -54,6 +67,36 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
         return nodeId;
     }
 
+    /**
+     * @return Discovery data for joined client.
+     */
+    public Map<UUID, Map<Integer, byte[]>> clientDiscoData() {
+        return clientDiscoData;
+    }
+
+    /**
+     * @param clientDiscoData Discovery data for joined client.
+     */
+    public void clientDiscoData(@Nullable Map<UUID, Map<Integer, byte[]>> clientDiscoData) {
+        this.clientDiscoData = clientDiscoData;
+
+        assert clientDiscoData == null || !clientDiscoData.containsKey(nodeId);
+    }
+
+    /**
+     * @return Client node attributes.
+     */
+    public Map<String, Object> clientNodeAttributes() {
+        return clientNodeAttrs;
+    }
+
+    /**
+     * @param clientNodeAttrs New client node attributes.
+     */
+    public void clientNodeAttributes(Map<String, Object> clientNodeAttrs) {
+        this.clientNodeAttrs = clientNodeAttrs;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index a9303f3..2a14158 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -148,7 +148,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
      *
      * @return Map with topology snapshots history.
      */
-    @Nullable public Map<Long, Collection<ClusterNode>> topologyHistory() {
+    public Map<Long, Collection<ClusterNode>> topologyHistory() {
         return topHist;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
index de5b0a7..f17c91b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -47,4 +48,9 @@ public class TcpDiscoveryPingRequest extends TcpDiscoveryAbstractMessage {
     @Nullable public UUID clientNodeId() {
         return clientNodeId;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryPingRequest.class, this, "super", super.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
index 6396764..02b2d48 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
@@ -17,7 +17,8 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
-import java.io.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
 import java.util.*;
 
 /**
@@ -31,13 +32,6 @@ public class TcpDiscoveryPingResponse extends TcpDiscoveryAbstractMessage {
     private boolean clientExists;
 
     /**
-     * For {@link Externalizable}.
-     */
-    public TcpDiscoveryPingResponse() {
-        // No-op.
-    }
-
-    /**
      * @param creatorNodeId Creator node ID.
      */
     public TcpDiscoveryPingResponse(UUID creatorNodeId) {
@@ -57,4 +51,9 @@ public class TcpDiscoveryPingResponse extends TcpDiscoveryAbstractMessage {
     public boolean clientExists() {
         return clientExists;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryPingResponse.class, this, "super", super.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index e7db285..7a88426 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -387,15 +387,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
 
         Space space = space(spaceName, false);
 
-        if (space == null)
-            return;
-
-        byte[] val = space.remove(key, c != null);
+        byte[] val = space == null ? null : space.remove(key, c != null);
 
         if (c != null)
             c.apply(val);
 
-        notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName);
+        if (space != null)
+             notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
new file mode 100644
index 0000000..467349f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.fair.*;
+import org.apache.ignite.cache.affinity.rendezvous.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientNodeAffinityTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODE_CNT = 4;
+
+    /** */
+    private static final String CACHE1 = "cache1";
+
+    /** */
+    private static final String CACHE2 = "cache2";
+
+    /** */
+    private static final String CACHE3 = "cache3";
+
+    /** */
+    private static final String CACHE4 = "cache4";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        if (gridName.equals(getTestGridName(NODE_CNT - 1)))
+            cfg.setClientMode(true);
+
+        CacheConfiguration ccfg1 = new CacheConfiguration();
+
+        ccfg1.setBackups(1);
+        ccfg1.setName(CACHE1);
+        ccfg1.setAffinity(new RendezvousAffinityFunction());
+        ccfg1.setNodeFilter(new TestNodesFilter());
+
+        CacheConfiguration ccfg2 = new CacheConfiguration();
+
+        ccfg2.setBackups(1);
+        ccfg2.setName(CACHE2);
+        ccfg2.setAffinity(new RendezvousAffinityFunction());
+
+        CacheConfiguration ccfg3 = new CacheConfiguration();
+
+        ccfg3.setBackups(1);
+        ccfg3.setName(CACHE3);
+        ccfg3.setAffinity(new FairAffinityFunction());
+        ccfg3.setNodeFilter(new TestNodesFilter());
+
+        CacheConfiguration ccfg4 = new CacheConfiguration();
+
+        ccfg4.setCacheMode(REPLICATED);
+        ccfg4.setName(CACHE4);
+        ccfg4.setNodeFilter(new TestNodesFilter());
+
+        cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODE_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientNodeNotInAffinity() throws Exception {
+        checkCache(CACHE1, 2);
+
+        checkCache(CACHE2, 2);
+
+        checkCache(CACHE3, 2);
+
+        checkCache(CACHE4, 3);
+
+        Ignite client = ignite(NODE_CNT - 1);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setBackups(0);
+
+        ccfg.setNodeFilter(new TestNodesFilter());
+
+        try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg)) {
+            checkCache(null, 1);
+        }
+
+        try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg, new NearCacheConfiguration())) {
+            checkCache(null, 1);
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param expNodes Expected number of nodes per partition.
+     */
+    private void checkCache(String cacheName, int expNodes) {
+        log.info("Test cache: " + cacheName);
+
+        Ignite client = ignite(NODE_CNT - 1);
+
+        assertTrue(client.configuration().isClientMode());
+
+        ClusterNode clientNode = client.cluster().localNode();
+
+        for (int i = 0; i < NODE_CNT; i++) {
+            Ignite ignite = ignite(i);
+
+            Affinity<Integer> aff = ignite.affinity(cacheName);
+
+            for (int part = 0; part < aff.partitions(); part++) {
+                Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(part);
+
+                assertEquals(expNodes, nodes.size());
+
+                assertFalse(nodes.contains(clientNode));
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestNodesFilter implements IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode clusterNode) {
+            Boolean attr = clusterNode.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
+
+            assertNotNull(attr);
+
+            assertFalse(attr);
+
+            return true;
+        }
+    }
+}