You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/12 12:44:16 UTC

[1/2] incubator-ignite git commit: # Rename TcpClientDiscoverySelfTest to TcpClientDiscoverySpiSelfTest

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-709_2 28498e99b -> 505a03e92


# Rename TcpClientDiscoverySelfTest to TcpClientDiscoverySpiSelfTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c05e368d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c05e368d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c05e368d

Branch: refs/heads/ignite-709_2
Commit: c05e368d9a10e77d39f72fcad22f625402102fda
Parents: 28498e9
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 12 13:37:44 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 12 13:37:44 2015 +0300

----------------------------------------------------------------------
 .../tcp/TcpClientDiscoverySelfTest.java         | 1028 ------------------
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 1028 ++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |    2 +-
 3 files changed, 1029 insertions(+), 1029 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c05e368d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
deleted file mode 100644
index 2a123ce..0000000
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
+++ /dev/null
@@ -1,1028 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.spi.discovery.tcp.messages.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Client-based discovery tests.
- */
-public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final AtomicInteger srvIdx = new AtomicInteger();
-
-    /** */
-    private static final AtomicInteger clientIdx = new AtomicInteger();
-
-    /** */
-    private static Collection<UUID> srvNodeIds;
-
-    /** */
-    private static Collection<UUID> clientNodeIds;
-
-    /** */
-    private static int clientsPerSrv;
-
-    /** */
-    private static CountDownLatch srvJoinedLatch;
-
-    /** */
-    private static CountDownLatch srvLeftLatch;
-
-    /** */
-    private static CountDownLatch srvFailedLatch;
-
-    /** */
-    private static CountDownLatch clientJoinedLatch;
-
-    /** */
-    private static CountDownLatch clientLeftLatch;
-
-    /** */
-    private static CountDownLatch clientFailedLatch;
-
-    /** */
-    private static CountDownLatch msgLatch;
-
-    /** */
-    private UUID nodeId;
-
-    /** */
-    private TcpDiscoveryVmIpFinder clientIpFinder;
-
-    /** */
-    private long joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setLocalHost("127.0.0.1");
-
-        if (gridName.startsWith("server")) {
-            TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-            disco.setIpFinder(IP_FINDER);
-
-            cfg.setDiscoverySpi(disco);
-        }
-        else if (gridName.startsWith("client")) {
-            TcpClientDiscoverySpi disco = new TestTcpClientDiscovery();
-
-            disco.setJoinTimeout(joinTimeout);
-
-            TcpDiscoveryVmIpFinder ipFinder;
-
-            if (clientIpFinder != null)
-                ipFinder = clientIpFinder;
-            else {
-                ipFinder = new TcpDiscoveryVmIpFinder();
-
-                String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).
-                    get((clientIdx.get() - 1) / clientsPerSrv).toString();
-
-                if (addr.startsWith("/"))
-                    addr = addr.substring(1);
-
-                ipFinder.setAddresses(Arrays.asList(addr));
-            }
-
-            disco.setIpFinder(ipFinder);
-
-            cfg.setDiscoverySpi(disco);
-
-            String nodeId = cfg.getNodeId().toString();
-
-            nodeId = "cc" + nodeId.substring(2);
-
-            cfg.setNodeId(UUID.fromString(nodeId));
-        }
-
-        if (nodeId != null)
-            cfg.setNodeId(nodeId);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
-
-        if (!F.isEmpty(addrs))
-            IP_FINDER.unregisterAddresses(addrs);
-
-        srvIdx.set(0);
-        clientIdx.set(0);
-
-        srvNodeIds = new GridConcurrentHashSet<>();
-        clientNodeIds = new GridConcurrentHashSet<>();
-
-        clientsPerSrv = 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllClients(true);
-        stopAllServers(true);
-
-        nodeId = null;
-        clientIpFinder = null;
-        joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
-
-        assert G.allGrids().isEmpty();
-    }
-
-    /**
-     *
-     * @throws Exception
-     */
-    public void testJoinTimeout() throws Exception {
-        clientIpFinder = new TcpDiscoveryVmIpFinder();
-        joinTimeout = 1000;
-
-        try {
-            startClientNodes(1);
-
-            fail("Client cannot be start because no server nodes run");
-        }
-        catch (IgniteCheckedException e) {
-            IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
-
-            assert spiEx != null : e;
-
-            assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientNodeJoin() throws Exception {
-        startServerNodes(3);
-        startClientNodes(3);
-
-        checkNodes(3, 3);
-
-        srvJoinedLatch = new CountDownLatch(3);
-        clientJoinedLatch = new CountDownLatch(3);
-
-        attachListeners(3, 3);
-
-        startClientNodes(1);
-
-        await(srvJoinedLatch);
-        await(clientJoinedLatch);
-
-        checkNodes(3, 4);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientNodeLeave() throws Exception {
-        startServerNodes(3);
-        startClientNodes(3);
-
-        checkNodes(3, 3);
-
-        srvLeftLatch = new CountDownLatch(3);
-        clientLeftLatch = new CountDownLatch(2);
-
-        attachListeners(3, 3);
-
-        stopGrid("client-2");
-
-        await(srvLeftLatch);
-        await(clientLeftLatch);
-
-        checkNodes(3, 2);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientNodeFail() throws Exception {
-        startServerNodes(3);
-        startClientNodes(3);
-
-        checkNodes(3, 3);
-
-        srvFailedLatch = new CountDownLatch(3);
-        clientFailedLatch = new CountDownLatch(2);
-
-        attachListeners(3, 3);
-
-        failClient(2);
-
-        await(srvFailedLatch);
-        await(clientFailedLatch);
-
-        checkNodes(3, 2);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testServerNodeJoin() throws Exception {
-        startServerNodes(3);
-        startClientNodes(3);
-
-        checkNodes(3, 3);
-
-        srvJoinedLatch = new CountDownLatch(3);
-        clientJoinedLatch = new CountDownLatch(3);
-
-        attachListeners(3, 3);
-
-        startServerNodes(1);
-
-        await(srvJoinedLatch);
-        await(clientJoinedLatch);
-
-        checkNodes(4, 3);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testServerNodeLeave() throws Exception {
-        startServerNodes(3);
-        startClientNodes(3);
-
-        checkNodes(3, 3);
-
-        srvLeftLatch = new CountDownLatch(2);
-        clientLeftLatch = new CountDownLatch(3);
-
-        attachListeners(3, 3);
-
-        stopGrid("server-2");
-
-        await(srvLeftLatch);
-        await(clientLeftLatch);
-
-        checkNodes(2, 3);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testServerNodeFail() throws Exception {
-        startServerNodes(3);
-        startClientNodes(3);
-
-        checkNodes(3, 3);
-
-        srvFailedLatch = new CountDownLatch(2);
-        clientFailedLatch = new CountDownLatch(3);
-
-        attachListeners(3, 3);
-
-        assert U.<Map>field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty();
-
-        failServer(2);
-
-        await(srvFailedLatch);
-        await(clientFailedLatch);
-
-        checkNodes(2, 3);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPing() throws Exception {
-        startServerNodes(2);
-        startClientNodes(1);
-
-        Ignite srv0 = G.ignite("server-0");
-        Ignite srv1 = G.ignite("server-1");
-        Ignite client = G.ignite("client-0");
-
-        assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
-        assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
-
-        assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
-        assert ((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientReconnectOnRouterFail() throws Exception {
-        clientsPerSrv = 1;
-
-        startServerNodes(3);
-        startClientNodes(3);
-
-        checkNodes(3, 3);
-
-        setClientRouter(2, 0);
-
-        srvFailedLatch = new CountDownLatch(2);
-        clientFailedLatch = new CountDownLatch(3);
-
-        attachListeners(2, 3);
-
-        failServer(2);
-
-        await(srvFailedLatch);
-        await(clientFailedLatch);
-
-        checkNodes(2, 3);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientReconnectOnNetworkProblem() throws Exception {
-        clientsPerSrv = 1;
-
-        startServerNodes(3);
-        startClientNodes(3);
-
-        checkNodes(3, 3);
-
-        setClientRouter(2, 0);
-
-        srvFailedLatch = new CountDownLatch(2);
-        clientFailedLatch = new CountDownLatch(3);
-
-        attachListeners(2, 3);
-
-        ((TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brokeConnection();
-
-        G.ignite("client-2").message().remoteListen(null, new MessageListener()); // Send some discovery message.
-
-        checkNodes(3, 3);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testGetMissedMessagesOnReconnect() throws Exception {
-        clientsPerSrv = 1;
-
-        startServerNodes(3);
-        startClientNodes(2);
-
-        checkNodes(3, 2);
-
-        clientLeftLatch = new CountDownLatch(1);
-        srvLeftLatch = new CountDownLatch(2);
-
-        attachListeners(2, 2);
-
-        ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
-
-        stopGrid("server-2");
-
-        await(srvLeftLatch);
-        await(srvLeftLatch);
-
-        Thread.sleep(500);
-
-        assert G.ignite("client-0").cluster().nodes().size() == 4;
-        assert G.ignite("client-1").cluster().nodes().size() == 5;
-
-        clientLeftLatch = new CountDownLatch(1);
-
-        ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resume();
-
-        await(clientLeftLatch);
-
-        checkNodes(2, 2);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientSegmentation() throws Exception {
-        clientsPerSrv = 1;
-
-        startServerNodes(3);
-        startClientNodes(3);
-
-        checkNodes(3, 3);
-
-//        setClientRouter(2, 2);
-
-        srvFailedLatch = new CountDownLatch(2 + 2);
-        clientFailedLatch = new CountDownLatch(2 + 2);
-
-        attachListeners(2, 2);
-
-        final CountDownLatch client2StoppedLatch = new CountDownLatch(1);
-
-        IgnitionListener lsnr = new IgnitionListener() {
-            @Override public void onStateChange(@Nullable String name, IgniteState state) {
-                if (state == IgniteState.STOPPED_ON_SEGMENTATION)
-                    client2StoppedLatch.countDown();
-            }
-        };
-        G.addListener(lsnr);
-
-        try {
-            failServer(2);
-
-            await(srvFailedLatch);
-            await(clientFailedLatch);
-
-            await(client2StoppedLatch);
-
-            checkNodes(2, 2);
-        }
-        finally {
-            G.removeListener(lsnr);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientNodeJoinOneServer() throws Exception {
-        startServerNodes(1);
-
-        srvJoinedLatch = new CountDownLatch(1);
-
-        attachListeners(1, 0);
-
-        startClientNodes(1);
-
-        await(srvJoinedLatch);
-
-        checkNodes(1, 1);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientNodeLeaveOneServer() throws Exception {
-        startServerNodes(1);
-        startClientNodes(1);
-
-        checkNodes(1, 1);
-
-        srvLeftLatch = new CountDownLatch(1);
-
-        attachListeners(1, 0);
-
-        stopGrid("client-0");
-
-        await(srvLeftLatch);
-
-        checkNodes(1, 0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testClientNodeFailOneServer() throws Exception {
-        startServerNodes(1);
-        startClientNodes(1);
-
-        checkNodes(1, 1);
-
-        srvFailedLatch = new CountDownLatch(1);
-
-        attachListeners(1, 0);
-
-        failClient(0);
-
-        await(srvFailedLatch);
-
-        checkNodes(1, 0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMetrics() throws Exception {
-        startServerNodes(3);
-        startClientNodes(3);
-
-        checkNodes(3, 3);
-
-        attachListeners(3, 3);
-
-        assertTrue(checkMetrics(3, 3, 0));
-
-        G.ignite("client-0").compute().broadcast(F.noop());
-
-        assertTrue(GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
-                return checkMetrics(3, 3, 1);
-            }
-        }, 10000));
-
-        checkMetrics(3, 3, 1);
-
-        G.ignite("server-0").compute().broadcast(F.noop());
-
-        assertTrue(GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
-                return checkMetrics(3, 3, 2);
-            }
-        }, 10000));
-    }
-
-    /**
-     * @param srvCnt Number of Number of server nodes.
-     * @param clientCnt Number of client nodes.
-     * @param execJobsCnt Expected number of executed jobs.
-     * @return Whether metrics are correct.
-     */
-    private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) {
-        for (int i = 0; i < srvCnt; i++) {
-            Ignite g = G.ignite("server-" + i);
-
-            for (ClusterNode n : g.cluster().nodes()) {
-                if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
-                    return false;
-            }
-        }
-
-        for (int i = 0; i < clientCnt; i++) {
-            Ignite g = G.ignite("client-" + i);
-
-            for (ClusterNode n : g.cluster().nodes()) {
-                if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
-                    return false;
-            }
-        }
-
-        return true;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testDataExchangeFromServer() throws Exception {
-        testDataExchange("server-0");
-    }
-
-    /**
-     * TODO: IGNITE-587.
-     *
-     * @throws Exception If failed.
-     */
-    public void testDataExchangeFromClient() throws Exception {
-        testDataExchange("client-0");
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void testDataExchange(String masterName) throws Exception {
-        startServerNodes(2);
-        startClientNodes(2);
-
-        checkNodes(2, 2);
-
-        IgniteMessaging msg = grid(masterName).message();
-
-        UUID id = null;
-
-        try {
-            id = msg.remoteListen(null, new MessageListener());
-
-            msgLatch = new CountDownLatch(4);
-
-            msg.send(null, "Message 1");
-
-            await(msgLatch);
-
-            startServerNodes(1);
-            startClientNodes(1);
-
-            checkNodes(3, 3);
-
-            msgLatch = new CountDownLatch(6);
-
-            msg.send(null, "Message 2");
-
-            await(msgLatch);
-        }
-        finally {
-            if (id != null)
-                msg.stopRemoteListen(id);
-        }
-    }
-
-    /**
-     * @throws Exception If any error occurs.
-     */
-    public void testDuplicateId() throws Exception {
-        startServerNodes(2);
-
-        nodeId = G.ignite("server-1").cluster().localNode().id();
-
-        try {
-            startGrid("client-0");
-
-            assert false;
-        }
-        catch (IgniteCheckedException e) {
-            IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
-
-            assert spiEx != null : e;
-            assert spiEx.getMessage().contains("same ID") : spiEx.getMessage();
-        }
-    }
-
-    /**
-     * @throws Exception If any error occurs.
-     */
-    public void testTimeoutWaitingNodeAddedMessage() throws Exception {
-        startServerNodes(2);
-
-        final CountDownLatch cnt = new CountDownLatch(1);
-
-        ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(
-            new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
-                @Override public void apply(TcpDiscoveryAbstractMessage msg) {
-                    try {
-                        cnt.await(10, MINUTES);
-                    }
-                    catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-
-                        throw new IgniteInterruptedException(e);
-                    }
-                }
-            });
-
-        try {
-            startGrid("client-0");
-
-            assert false;
-        }
-        catch (IgniteCheckedException e) {
-            cnt.countDown();
-
-            IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
-
-            assert spiEx != null : e;
-            assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
-        }
-    }
-
-    /**
-     * @throws Exception If any error occurs.
-     */
-    public void testGridStartTime() throws Exception {
-        startServerNodes(2);
-
-        startClientNodes(2);
-
-        long startTime = -1;
-
-        for (Ignite g : G.allGrids()) {
-            IgniteEx kernal = (IgniteKernal)g;
-
-            assertTrue(kernal.context().discovery().gridStartTime() > 0);
-
-            if (startTime == -1)
-                startTime = kernal.context().discovery().gridStartTime();
-            else
-                assertEquals(startTime, kernal.context().discovery().gridStartTime());
-        }
-    }
-
-    /**
-     * @param clientIdx Index.
-     * @throws Exception In case of error.
-     */
-    private void setClientRouter(int clientIdx, int srvIdx) throws Exception {
-        TcpClientDiscoverySpi disco =
-            (TcpClientDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi();
-
-        TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder();
-
-        String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).get(srvIdx).toString();
-
-        if (addr.startsWith("/"))
-            addr = addr.substring(1);
-
-        ipFinder.setAddresses(Arrays.asList(addr));
-    }
-
-    /**
-     * @param cnt Number of nodes.
-     * @throws Exception In case of error.
-     */
-    private void startServerNodes(int cnt) throws Exception {
-        for (int i = 0; i < cnt; i++) {
-            Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
-
-            srvNodeIds.add(g.cluster().localNode().id());
-        }
-    }
-
-    /**
-     * @param cnt Number of nodes.
-     * @throws Exception In case of error.
-     */
-    private void startClientNodes(int cnt) throws Exception {
-        for (int i = 0; i < cnt; i++) {
-            Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
-
-            clientNodeIds.add(g.cluster().localNode().id());
-        }
-    }
-
-    /**
-     * @param idx Index.
-     */
-    private void failServer(int idx) {
-        ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
-    }
-
-    /**
-     * @param idx Index.
-     */
-    private void failClient(int idx) {
-        ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
-    }
-
-    /**
-     * @param srvCnt Number of server nodes.
-     * @param clientCnt Number of client nodes.
-     */
-    private void attachListeners(int srvCnt, int clientCnt) throws Exception {
-        if (srvJoinedLatch != null) {
-            for (int i = 0; i < srvCnt; i++) {
-                G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
-                    @Override public boolean apply(Event evt) {
-                        info("Joined event fired on server: " + evt);
-
-                        srvJoinedLatch.countDown();
-
-                        return true;
-                    }
-                }, EVT_NODE_JOINED);
-            }
-        }
-
-        if (srvLeftLatch != null) {
-            for (int i = 0; i < srvCnt; i++) {
-                G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
-                    @Override public boolean apply(Event evt) {
-                        info("Left event fired on server: " + evt);
-
-                        srvLeftLatch.countDown();
-
-                        return true;
-                    }
-                }, EVT_NODE_LEFT);
-            }
-        }
-
-        if (srvFailedLatch != null) {
-            for (int i = 0; i < srvCnt; i++) {
-                G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
-                    @Override public boolean apply(Event evt) {
-                        info("Failed event fired on server: " + evt);
-
-                        srvFailedLatch.countDown();
-
-                        return true;
-                    }
-                }, EVT_NODE_FAILED);
-            }
-        }
-
-        if (clientJoinedLatch != null) {
-            for (int i = 0; i < clientCnt; i++) {
-                G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
-                    @Override public boolean apply(Event evt) {
-                        info("Joined event fired on client: " + evt);
-
-                        clientJoinedLatch.countDown();
-
-                        return true;
-                    }
-                }, EVT_NODE_JOINED);
-            }
-        }
-
-        if (clientLeftLatch != null) {
-            for (int i = 0; i < clientCnt; i++) {
-                G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
-                    @Override public boolean apply(Event evt) {
-                        info("Left event fired on client: " + evt);
-
-                        clientLeftLatch.countDown();
-
-                        return true;
-                    }
-                }, EVT_NODE_LEFT);
-            }
-        }
-
-        if (clientFailedLatch != null) {
-            for (int i = 0; i < clientCnt; i++) {
-                G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
-                    @Override public boolean apply(Event evt) {
-                        info("Failed event fired on client: " + evt);
-
-                        clientFailedLatch.countDown();
-
-                        return true;
-                    }
-                }, EVT_NODE_FAILED);
-            }
-        }
-    }
-
-    /**
-     * @param srvCnt Number of server nodes.
-     * @param clientCnt Number of client nodes.
-     */
-    private void checkNodes(int srvCnt, int clientCnt) {
-        for (int i = 0; i < srvCnt; i++) {
-            Ignite g = G.ignite("server-" + i);
-
-            assertTrue(srvNodeIds.contains(g.cluster().localNode().id()));
-
-            assertFalse(g.cluster().localNode().isClient());
-
-            checkRemoteNodes(g, srvCnt + clientCnt - 1);
-        }
-
-        for (int i = 0; i < clientCnt; i++) {
-            Ignite g = G.ignite("client-" + i);
-
-            assertTrue(clientNodeIds.contains(g.cluster().localNode().id()));
-
-            assertTrue(g.cluster().localNode().isClient());
-
-            checkRemoteNodes(g, srvCnt + clientCnt - 1);
-        }
-    }
-
-    /**
-     * @param ignite Grid.
-     * @param expCnt Expected nodes count.
-     */
-    @SuppressWarnings("TypeMayBeWeakened")
-    private void checkRemoteNodes(Ignite ignite, int expCnt) {
-        Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
-
-        assertEquals(expCnt, nodes.size());
-
-        for (ClusterNode node : nodes) {
-            UUID id = node.id();
-
-            if (clientNodeIds.contains(id))
-                assertTrue(node.isClient());
-            else if (srvNodeIds.contains(id))
-                assertFalse(node.isClient());
-            else
-                assert false : "Unexpected node ID: " + id;
-        }
-    }
-
-    /**
-     * @param latch Latch.
-     * @throws InterruptedException If interrupted.
-     */
-    private void await(CountDownLatch latch) throws InterruptedException {
-        assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
-    }
-
-    /**
-     */
-    private static class MessageListener implements IgniteBiPredicate<UUID, Object> {
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /** {@inheritDoc} */
-        @Override public boolean apply(UUID uuid, Object msg) {
-            X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
-
-            msgLatch.countDown();
-
-            return true;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi {
-        /** */
-        private final Object mux = new Object();
-
-        /** */
-        private final AtomicBoolean writeLock = new AtomicBoolean();
-
-        /** */
-        private final AtomicBoolean openSockLock = new AtomicBoolean();
-
-        /**
-         * @param lock Lock.
-         */
-        private void waitFor(AtomicBoolean lock) {
-            try {
-                synchronized (mux) {
-                    while (lock.get())
-                        mux.wait();
-                }
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new RuntimeException(e);
-            }
-        }
-
-        /**
-         * @param isPause Is lock.
-         * @param locks Locks.
-         */
-        private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) {
-            synchronized (mux) {
-                for (AtomicBoolean lock : locks)
-                    lock.set(isPause);
-
-                mux.notifyAll();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
-            waitFor(writeLock);
-
-            super.writeToSocket(sock, msg, bout);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
-            waitFor(openSockLock);
-
-            return super.openSocket(sockAddr);
-        }
-
-        /**
-         *
-         */
-        private void pauseAll() {
-            pauseResumeOperation(true, openSockLock, writeLock);
-
-            brokeConnection();
-        }
-
-        /**
-         *
-         */
-        private void resume() {
-            pauseResumeOperation(false, openSockLock, writeLock);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c05e368d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
new file mode 100644
index 0000000..a06bfd9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -0,0 +1,1028 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Client-based discovery tests.
+ */
+public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final AtomicInteger srvIdx = new AtomicInteger();
+
+    /** */
+    private static final AtomicInteger clientIdx = new AtomicInteger();
+
+    /** */
+    private static Collection<UUID> srvNodeIds;
+
+    /** */
+    private static Collection<UUID> clientNodeIds;
+
+    /** */
+    private static int clientsPerSrv;
+
+    /** */
+    private static CountDownLatch srvJoinedLatch;
+
+    /** */
+    private static CountDownLatch srvLeftLatch;
+
+    /** */
+    private static CountDownLatch srvFailedLatch;
+
+    /** */
+    private static CountDownLatch clientJoinedLatch;
+
+    /** */
+    private static CountDownLatch clientLeftLatch;
+
+    /** */
+    private static CountDownLatch clientFailedLatch;
+
+    /** */
+    private static CountDownLatch msgLatch;
+
+    /** */
+    private UUID nodeId;
+
+    /** */
+    private TcpDiscoveryVmIpFinder clientIpFinder;
+
+    /** */
+    private long joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setLocalHost("127.0.0.1");
+
+        if (gridName.startsWith("server")) {
+            TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+            disco.setIpFinder(IP_FINDER);
+
+            cfg.setDiscoverySpi(disco);
+        }
+        else if (gridName.startsWith("client")) {
+            TcpClientDiscoverySpi disco = new TestTcpClientDiscovery();
+
+            disco.setJoinTimeout(joinTimeout);
+
+            TcpDiscoveryVmIpFinder ipFinder;
+
+            if (clientIpFinder != null)
+                ipFinder = clientIpFinder;
+            else {
+                ipFinder = new TcpDiscoveryVmIpFinder();
+
+                String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).
+                    get((clientIdx.get() - 1) / clientsPerSrv).toString();
+
+                if (addr.startsWith("/"))
+                    addr = addr.substring(1);
+
+                ipFinder.setAddresses(Arrays.asList(addr));
+            }
+
+            disco.setIpFinder(ipFinder);
+
+            cfg.setDiscoverySpi(disco);
+
+            String nodeId = cfg.getNodeId().toString();
+
+            nodeId = "cc" + nodeId.substring(2);
+
+            cfg.setNodeId(UUID.fromString(nodeId));
+        }
+
+        if (nodeId != null)
+            cfg.setNodeId(nodeId);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
+
+        if (!F.isEmpty(addrs))
+            IP_FINDER.unregisterAddresses(addrs);
+
+        srvIdx.set(0);
+        clientIdx.set(0);
+
+        srvNodeIds = new GridConcurrentHashSet<>();
+        clientNodeIds = new GridConcurrentHashSet<>();
+
+        clientsPerSrv = 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllClients(true);
+        stopAllServers(true);
+
+        nodeId = null;
+        clientIpFinder = null;
+        joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
+
+        assert G.allGrids().isEmpty();
+    }
+
+    /**
+     *
+     * @throws Exception
+     */
+    public void testJoinTimeout() throws Exception {
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+        joinTimeout = 1000;
+
+        try {
+            startClientNodes(1);
+
+            fail("Client cannot be start because no server nodes run");
+        }
+        catch (IgniteCheckedException e) {
+            IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+            assert spiEx != null : e;
+
+            assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientNodeJoin() throws Exception {
+        startServerNodes(3);
+        startClientNodes(3);
+
+        checkNodes(3, 3);
+
+        srvJoinedLatch = new CountDownLatch(3);
+        clientJoinedLatch = new CountDownLatch(3);
+
+        attachListeners(3, 3);
+
+        startClientNodes(1);
+
+        await(srvJoinedLatch);
+        await(clientJoinedLatch);
+
+        checkNodes(3, 4);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientNodeLeave() throws Exception {
+        startServerNodes(3);
+        startClientNodes(3);
+
+        checkNodes(3, 3);
+
+        srvLeftLatch = new CountDownLatch(3);
+        clientLeftLatch = new CountDownLatch(2);
+
+        attachListeners(3, 3);
+
+        stopGrid("client-2");
+
+        await(srvLeftLatch);
+        await(clientLeftLatch);
+
+        checkNodes(3, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientNodeFail() throws Exception {
+        startServerNodes(3);
+        startClientNodes(3);
+
+        checkNodes(3, 3);
+
+        srvFailedLatch = new CountDownLatch(3);
+        clientFailedLatch = new CountDownLatch(2);
+
+        attachListeners(3, 3);
+
+        failClient(2);
+
+        await(srvFailedLatch);
+        await(clientFailedLatch);
+
+        checkNodes(3, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerNodeJoin() throws Exception {
+        startServerNodes(3);
+        startClientNodes(3);
+
+        checkNodes(3, 3);
+
+        srvJoinedLatch = new CountDownLatch(3);
+        clientJoinedLatch = new CountDownLatch(3);
+
+        attachListeners(3, 3);
+
+        startServerNodes(1);
+
+        await(srvJoinedLatch);
+        await(clientJoinedLatch);
+
+        checkNodes(4, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerNodeLeave() throws Exception {
+        startServerNodes(3);
+        startClientNodes(3);
+
+        checkNodes(3, 3);
+
+        srvLeftLatch = new CountDownLatch(2);
+        clientLeftLatch = new CountDownLatch(3);
+
+        attachListeners(3, 3);
+
+        stopGrid("server-2");
+
+        await(srvLeftLatch);
+        await(clientLeftLatch);
+
+        checkNodes(2, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerNodeFail() throws Exception {
+        startServerNodes(3);
+        startClientNodes(3);
+
+        checkNodes(3, 3);
+
+        srvFailedLatch = new CountDownLatch(2);
+        clientFailedLatch = new CountDownLatch(3);
+
+        attachListeners(3, 3);
+
+        assert U.<Map>field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty();
+
+        failServer(2);
+
+        await(srvFailedLatch);
+        await(clientFailedLatch);
+
+        checkNodes(2, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPing() throws Exception {
+        startServerNodes(2);
+        startClientNodes(1);
+
+        Ignite srv0 = G.ignite("server-0");
+        Ignite srv1 = G.ignite("server-1");
+        Ignite client = G.ignite("client-0");
+
+        assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+        assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+
+        assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+        assert ((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectOnRouterFail() throws Exception {
+        clientsPerSrv = 1;
+
+        startServerNodes(3);
+        startClientNodes(3);
+
+        checkNodes(3, 3);
+
+        setClientRouter(2, 0);
+
+        srvFailedLatch = new CountDownLatch(2);
+        clientFailedLatch = new CountDownLatch(3);
+
+        attachListeners(2, 3);
+
+        failServer(2);
+
+        await(srvFailedLatch);
+        await(clientFailedLatch);
+
+        checkNodes(2, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectOnNetworkProblem() throws Exception {
+        clientsPerSrv = 1;
+
+        startServerNodes(3);
+        startClientNodes(3);
+
+        checkNodes(3, 3);
+
+        setClientRouter(2, 0);
+
+        srvFailedLatch = new CountDownLatch(2);
+        clientFailedLatch = new CountDownLatch(3);
+
+        attachListeners(2, 3);
+
+        ((TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brokeConnection();
+
+        G.ignite("client-2").message().remoteListen(null, new MessageListener()); // Send some discovery message.
+
+        checkNodes(3, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetMissedMessagesOnReconnect() throws Exception {
+        clientsPerSrv = 1;
+
+        startServerNodes(3);
+        startClientNodes(2);
+
+        checkNodes(3, 2);
+
+        clientLeftLatch = new CountDownLatch(1);
+        srvLeftLatch = new CountDownLatch(2);
+
+        attachListeners(2, 2);
+
+        ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
+
+        stopGrid("server-2");
+
+        await(srvLeftLatch);
+        await(srvLeftLatch);
+
+        Thread.sleep(500);
+
+        assert G.ignite("client-0").cluster().nodes().size() == 4;
+        assert G.ignite("client-1").cluster().nodes().size() == 5;
+
+        clientLeftLatch = new CountDownLatch(1);
+
+        ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resume();
+
+        await(clientLeftLatch);
+
+        checkNodes(2, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientSegmentation() throws Exception {
+        clientsPerSrv = 1;
+
+        startServerNodes(3);
+        startClientNodes(3);
+
+        checkNodes(3, 3);
+
+//        setClientRouter(2, 2);
+
+        srvFailedLatch = new CountDownLatch(2 + 2);
+        clientFailedLatch = new CountDownLatch(2 + 2);
+
+        attachListeners(2, 2);
+
+        final CountDownLatch client2StoppedLatch = new CountDownLatch(1);
+
+        IgnitionListener lsnr = new IgnitionListener() {
+            @Override public void onStateChange(@Nullable String name, IgniteState state) {
+                if (state == IgniteState.STOPPED_ON_SEGMENTATION)
+                    client2StoppedLatch.countDown();
+            }
+        };
+        G.addListener(lsnr);
+
+        try {
+            failServer(2);
+
+            await(srvFailedLatch);
+            await(clientFailedLatch);
+
+            await(client2StoppedLatch);
+
+            checkNodes(2, 2);
+        }
+        finally {
+            G.removeListener(lsnr);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientNodeJoinOneServer() throws Exception {
+        startServerNodes(1);
+
+        srvJoinedLatch = new CountDownLatch(1);
+
+        attachListeners(1, 0);
+
+        startClientNodes(1);
+
+        await(srvJoinedLatch);
+
+        checkNodes(1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientNodeLeaveOneServer() throws Exception {
+        startServerNodes(1);
+        startClientNodes(1);
+
+        checkNodes(1, 1);
+
+        srvLeftLatch = new CountDownLatch(1);
+
+        attachListeners(1, 0);
+
+        stopGrid("client-0");
+
+        await(srvLeftLatch);
+
+        checkNodes(1, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientNodeFailOneServer() throws Exception {
+        startServerNodes(1);
+        startClientNodes(1);
+
+        checkNodes(1, 1);
+
+        srvFailedLatch = new CountDownLatch(1);
+
+        attachListeners(1, 0);
+
+        failClient(0);
+
+        await(srvFailedLatch);
+
+        checkNodes(1, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMetrics() throws Exception {
+        startServerNodes(3);
+        startClientNodes(3);
+
+        checkNodes(3, 3);
+
+        attachListeners(3, 3);
+
+        assertTrue(checkMetrics(3, 3, 0));
+
+        G.ignite("client-0").compute().broadcast(F.noop());
+
+        assertTrue(GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return checkMetrics(3, 3, 1);
+            }
+        }, 10000));
+
+        checkMetrics(3, 3, 1);
+
+        G.ignite("server-0").compute().broadcast(F.noop());
+
+        assertTrue(GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return checkMetrics(3, 3, 2);
+            }
+        }, 10000));
+    }
+
+    /**
+     * @param srvCnt Number of Number of server nodes.
+     * @param clientCnt Number of client nodes.
+     * @param execJobsCnt Expected number of executed jobs.
+     * @return Whether metrics are correct.
+     */
+    private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) {
+        for (int i = 0; i < srvCnt; i++) {
+            Ignite g = G.ignite("server-" + i);
+
+            for (ClusterNode n : g.cluster().nodes()) {
+                if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
+                    return false;
+            }
+        }
+
+        for (int i = 0; i < clientCnt; i++) {
+            Ignite g = G.ignite("client-" + i);
+
+            for (ClusterNode n : g.cluster().nodes()) {
+                if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
+                    return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDataExchangeFromServer() throws Exception {
+        testDataExchange("server-0");
+    }
+
+    /**
+     * TODO: IGNITE-587.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDataExchangeFromClient() throws Exception {
+        testDataExchange("client-0");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testDataExchange(String masterName) throws Exception {
+        startServerNodes(2);
+        startClientNodes(2);
+
+        checkNodes(2, 2);
+
+        IgniteMessaging msg = grid(masterName).message();
+
+        UUID id = null;
+
+        try {
+            id = msg.remoteListen(null, new MessageListener());
+
+            msgLatch = new CountDownLatch(4);
+
+            msg.send(null, "Message 1");
+
+            await(msgLatch);
+
+            startServerNodes(1);
+            startClientNodes(1);
+
+            checkNodes(3, 3);
+
+            msgLatch = new CountDownLatch(6);
+
+            msg.send(null, "Message 2");
+
+            await(msgLatch);
+        }
+        finally {
+            if (id != null)
+                msg.stopRemoteListen(id);
+        }
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void testDuplicateId() throws Exception {
+        startServerNodes(2);
+
+        nodeId = G.ignite("server-1").cluster().localNode().id();
+
+        try {
+            startGrid("client-0");
+
+            assert false;
+        }
+        catch (IgniteCheckedException e) {
+            IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+            assert spiEx != null : e;
+            assert spiEx.getMessage().contains("same ID") : spiEx.getMessage();
+        }
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void testTimeoutWaitingNodeAddedMessage() throws Exception {
+        startServerNodes(2);
+
+        final CountDownLatch cnt = new CountDownLatch(1);
+
+        ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(
+            new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
+                @Override public void apply(TcpDiscoveryAbstractMessage msg) {
+                    try {
+                        cnt.await(10, MINUTES);
+                    }
+                    catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+
+                        throw new IgniteInterruptedException(e);
+                    }
+                }
+            });
+
+        try {
+            startGrid("client-0");
+
+            assert false;
+        }
+        catch (IgniteCheckedException e) {
+            cnt.countDown();
+
+            IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+            assert spiEx != null : e;
+            assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+        }
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void testGridStartTime() throws Exception {
+        startServerNodes(2);
+
+        startClientNodes(2);
+
+        long startTime = -1;
+
+        for (Ignite g : G.allGrids()) {
+            IgniteEx kernal = (IgniteKernal)g;
+
+            assertTrue(kernal.context().discovery().gridStartTime() > 0);
+
+            if (startTime == -1)
+                startTime = kernal.context().discovery().gridStartTime();
+            else
+                assertEquals(startTime, kernal.context().discovery().gridStartTime());
+        }
+    }
+
+    /**
+     * @param clientIdx Index.
+     * @throws Exception In case of error.
+     */
+    private void setClientRouter(int clientIdx, int srvIdx) throws Exception {
+        TcpClientDiscoverySpi disco =
+            (TcpClientDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi();
+
+        TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder();
+
+        String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).get(srvIdx).toString();
+
+        if (addr.startsWith("/"))
+            addr = addr.substring(1);
+
+        ipFinder.setAddresses(Arrays.asList(addr));
+    }
+
+    /**
+     * @param cnt Number of nodes.
+     * @throws Exception In case of error.
+     */
+    private void startServerNodes(int cnt) throws Exception {
+        for (int i = 0; i < cnt; i++) {
+            Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+            srvNodeIds.add(g.cluster().localNode().id());
+        }
+    }
+
+    /**
+     * @param cnt Number of nodes.
+     * @throws Exception In case of error.
+     */
+    private void startClientNodes(int cnt) throws Exception {
+        for (int i = 0; i < cnt; i++) {
+            Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
+
+            clientNodeIds.add(g.cluster().localNode().id());
+        }
+    }
+
+    /**
+     * @param idx Index.
+     */
+    private void failServer(int idx) {
+        ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
+    }
+
+    /**
+     * @param idx Index.
+     */
+    private void failClient(int idx) {
+        ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
+    }
+
+    /**
+     * @param srvCnt Number of server nodes.
+     * @param clientCnt Number of client nodes.
+     */
+    private void attachListeners(int srvCnt, int clientCnt) throws Exception {
+        if (srvJoinedLatch != null) {
+            for (int i = 0; i < srvCnt; i++) {
+                G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+                    @Override public boolean apply(Event evt) {
+                        info("Joined event fired on server: " + evt);
+
+                        srvJoinedLatch.countDown();
+
+                        return true;
+                    }
+                }, EVT_NODE_JOINED);
+            }
+        }
+
+        if (srvLeftLatch != null) {
+            for (int i = 0; i < srvCnt; i++) {
+                G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+                    @Override public boolean apply(Event evt) {
+                        info("Left event fired on server: " + evt);
+
+                        srvLeftLatch.countDown();
+
+                        return true;
+                    }
+                }, EVT_NODE_LEFT);
+            }
+        }
+
+        if (srvFailedLatch != null) {
+            for (int i = 0; i < srvCnt; i++) {
+                G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+                    @Override public boolean apply(Event evt) {
+                        info("Failed event fired on server: " + evt);
+
+                        srvFailedLatch.countDown();
+
+                        return true;
+                    }
+                }, EVT_NODE_FAILED);
+            }
+        }
+
+        if (clientJoinedLatch != null) {
+            for (int i = 0; i < clientCnt; i++) {
+                G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+                    @Override public boolean apply(Event evt) {
+                        info("Joined event fired on client: " + evt);
+
+                        clientJoinedLatch.countDown();
+
+                        return true;
+                    }
+                }, EVT_NODE_JOINED);
+            }
+        }
+
+        if (clientLeftLatch != null) {
+            for (int i = 0; i < clientCnt; i++) {
+                G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+                    @Override public boolean apply(Event evt) {
+                        info("Left event fired on client: " + evt);
+
+                        clientLeftLatch.countDown();
+
+                        return true;
+                    }
+                }, EVT_NODE_LEFT);
+            }
+        }
+
+        if (clientFailedLatch != null) {
+            for (int i = 0; i < clientCnt; i++) {
+                G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+                    @Override public boolean apply(Event evt) {
+                        info("Failed event fired on client: " + evt);
+
+                        clientFailedLatch.countDown();
+
+                        return true;
+                    }
+                }, EVT_NODE_FAILED);
+            }
+        }
+    }
+
+    /**
+     * @param srvCnt Number of server nodes.
+     * @param clientCnt Number of client nodes.
+     */
+    private void checkNodes(int srvCnt, int clientCnt) {
+        for (int i = 0; i < srvCnt; i++) {
+            Ignite g = G.ignite("server-" + i);
+
+            assertTrue(srvNodeIds.contains(g.cluster().localNode().id()));
+
+            assertFalse(g.cluster().localNode().isClient());
+
+            checkRemoteNodes(g, srvCnt + clientCnt - 1);
+        }
+
+        for (int i = 0; i < clientCnt; i++) {
+            Ignite g = G.ignite("client-" + i);
+
+            assertTrue(clientNodeIds.contains(g.cluster().localNode().id()));
+
+            assertTrue(g.cluster().localNode().isClient());
+
+            checkRemoteNodes(g, srvCnt + clientCnt - 1);
+        }
+    }
+
+    /**
+     * @param ignite Grid.
+     * @param expCnt Expected nodes count.
+     */
+    @SuppressWarnings("TypeMayBeWeakened")
+    private void checkRemoteNodes(Ignite ignite, int expCnt) {
+        Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
+
+        assertEquals(expCnt, nodes.size());
+
+        for (ClusterNode node : nodes) {
+            UUID id = node.id();
+
+            if (clientNodeIds.contains(id))
+                assertTrue(node.isClient());
+            else if (srvNodeIds.contains(id))
+                assertFalse(node.isClient());
+            else
+                assert false : "Unexpected node ID: " + id;
+        }
+    }
+
+    /**
+     * @param latch Latch.
+     * @throws InterruptedException If interrupted.
+     */
+    private void await(CountDownLatch latch) throws InterruptedException {
+        assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
+    }
+
+    /**
+     */
+    private static class MessageListener implements IgniteBiPredicate<UUID, Object> {
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, Object msg) {
+            X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
+
+            msgLatch.countDown();
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi {
+        /** */
+        private final Object mux = new Object();
+
+        /** */
+        private final AtomicBoolean writeLock = new AtomicBoolean();
+
+        /** */
+        private final AtomicBoolean openSockLock = new AtomicBoolean();
+
+        /**
+         * @param lock Lock.
+         */
+        private void waitFor(AtomicBoolean lock) {
+            try {
+                synchronized (mux) {
+                    while (lock.get())
+                        mux.wait();
+                }
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new RuntimeException(e);
+            }
+        }
+
+        /**
+         * @param isPause Is lock.
+         * @param locks Locks.
+         */
+        private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) {
+            synchronized (mux) {
+                for (AtomicBoolean lock : locks)
+                    lock.set(isPause);
+
+                mux.notifyAll();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+            waitFor(writeLock);
+
+            super.writeToSocket(sock, msg, bout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+            waitFor(openSockLock);
+
+            return super.openSocket(sockAddr);
+        }
+
+        /**
+         *
+         */
+        private void pauseAll() {
+            pauseResumeOperation(true, openSockLock, writeLock);
+
+            brokeConnection();
+        }
+
+        /**
+         *
+         */
+        private void resume() {
+            pauseResumeOperation(false, openSockLock, writeLock);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c05e368d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index ebc7111..8bf8dbc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -51,7 +51,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(GridTcpSpiForwardingSelfTest.class));
 
-        suite.addTest(new TestSuite(TcpClientDiscoverySelfTest.class));
+        suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoverySpiConfigSelfTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
 


[2/2] incubator-ignite git commit: # IGNITE-709 Improve ping from client to server.

Posted by sb...@apache.org.
# IGNITE-709 Improve ping from client to server.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/505a03e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/505a03e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/505a03e9

Branch: refs/heads/ignite-709_2
Commit: 505a03e92db2747ed5beb07eb47cce17271d387c
Parents: c05e368
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 12 13:43:26 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 12 13:43:26 2015 +0300

----------------------------------------------------------------------
 .../discovery/tcp/TcpClientDiscoverySpi.java    | 56 +++++++++++++++-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 62 ++++++++++++++++--
 .../messages/TcpDiscoveryClientPingRequest.java | 56 ++++++++++++++++
 .../TcpDiscoveryClientPingResponse.java         | 67 ++++++++++++++++++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 30 +++++++++
 5 files changed, 264 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 6752bf5..d55d1c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -75,6 +76,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
 
+    /** Remote nodes. */
+    private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>();
+
     /** Socket writer. */
     private SocketWriter sockWriter;
 
@@ -316,6 +320,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
             }
         }
 
+        for (GridFutureAdapter<Boolean> fut : pingFuts.values())
+            fut.onDone(false);
+
         rmtNodes.clear();
 
         U.interrupt(sockTimeoutWorker);
@@ -359,15 +366,46 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     }
 
     /** {@inheritDoc} */
-    @Override public boolean pingNode(UUID nodeId) {
-        assert nodeId != null;
+    @Override public boolean pingNode(@NotNull final UUID nodeId) {
+        if (getSpiContext().isStopping())
+            return false;
 
         if (nodeId.equals(getLocalNodeId()))
             return true;
 
         TcpDiscoveryNode node = rmtNodes.get(nodeId);
 
-        return node != null && node.visible();
+        if (node == null || !node.visible())
+            return false;
+
+        GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId);
+
+        if (fut == null) {
+            fut = new GridFutureAdapter<>();
+
+            GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut);
+
+            if (oldFut != null)
+                fut = oldFut;
+            else
+                sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+        }
+
+        final GridFutureAdapter<Boolean> finalFut = fut;
+
+        timer.schedule(new TimerTask() {
+            @Override public void run() {
+                if (pingFuts.remove(nodeId, finalFut))
+                    finalFut.onDone(false);
+            }
+        }, netTimeout);
+
+        try {
+            return fut.get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException(e); // Should newer occur
+        }
     }
 
     /** {@inheritDoc} */
@@ -1069,6 +1107,8 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                 processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
             else if (msg instanceof TcpDiscoveryCustomEventMessage)
                 processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+            else if (msg instanceof TcpDiscoveryClientPingResponse)
+                processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
 
             stats.onMessageProcessingFinished(msg);
         }
@@ -1366,6 +1406,16 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
         }
 
         /**
+         * @param msg Message.
+         */
+        private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) {
+            GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing());
+
+            if (fut != null)
+                fut.onDone(msg.result());
+        }
+
+        /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
          * @param cacheMetrics Cache metrics.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 3624791..e00f798 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -203,6 +203,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private int reconCnt = DFLT_RECONNECT_CNT;
 
+    /** */
+    private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 10, 2000, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>());
+
     /** Nodes ring. */
     @GridToStringExclude
     private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
@@ -285,6 +289,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
     private final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs =
         new CopyOnWriteArrayList<>();
 
+    /** */
+    private final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs =
+        new CopyOnWriteArrayList<>();
+
     /** {@inheritDoc} */
     @IgniteInstanceResource
     @Override public void injectResources(Ignite ignite) {
@@ -2034,15 +2042,29 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      */
-    public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) {
-        sendMsgLsnrs.add(msg);
+    public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> lsnr) {
+        sendMsgLsnrs.add(lsnr);
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     */
+    public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> lsnr) {
+        sendMsgLsnrs.remove(lsnr);
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     */
+    public void addIncomeConnectionListener(IgniteInClosure<Socket> lsnr) {
+        incomeConnLsnrs.add(lsnr);
     }
 
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      */
-    public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) {
-        sendMsgLsnrs.remove(msg);
+    public void removeIncomeConnectionListener(IgniteInClosure<Socket> lsnr) {
+        incomeConnLsnrs.remove(lsnr);
     }
 
     /**
@@ -2634,6 +2656,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
             else if (msg instanceof TcpDiscoveryCustomEventMessage)
                 processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
 
+            else if (msg instanceof TcpDiscoveryClientPingRequest)
+                processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
+
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
@@ -4448,6 +4473,32 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
         /**
          * @param msg Message.
          */
+        private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) {
+            utilityPool.execute(new Runnable() {
+                @Override public void run() {
+                    boolean res = pingNode(msg.nodeToPing());
+
+                    final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId());
+
+                    if (worker == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Ping request from dead client node, will be skipped: " + msg.creatorNodeId());
+                    }
+                    else {
+                        TcpDiscoveryClientPingResponse pingRes = new TcpDiscoveryClientPingResponse(
+                            getLocalNodeId(), msg.nodeToPing(), res);
+
+                        pingRes.verify(getLocalNodeId());
+
+                        worker.addMessage(pingRes);
+                    }
+                }
+            });
+        }
+
+        /**
+         * @param msg Message.
+         */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
             if (isLocalNodeCoordinator()) {
                 if (msg.verified()) {
@@ -4643,6 +4694,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
 
                     sock.setSoTimeout((int)netTimeout);
 
+                    for (IgniteInClosure<Socket> connLsnr : incomeConnLsnrs)
+                        connLsnr.apply(sock);
+
                     in = new BufferedInputStream(sock.getInputStream());
 
                     byte[] buf = new byte[4];

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
new file mode 100644
index 0000000..f9f164d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryClientPingRequest extends TcpDiscoveryAbstractMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Pinged client node ID. */
+    private final UUID nodeToPing;
+
+    /**
+     * @param creatorNodeId Creator node ID.
+     * @param nodeToPing Pinged client node ID.
+     */
+    public TcpDiscoveryClientPingRequest(UUID creatorNodeId, @Nullable UUID nodeToPing) {
+        super(creatorNodeId);
+
+        this.nodeToPing = nodeToPing;
+    }
+
+    /**
+     * @return Pinged client node ID.
+     */
+    @Nullable public UUID nodeToPing() {
+        return nodeToPing;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryClientPingRequest.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
new file mode 100644
index 0000000..26a2b00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryClientPingResponse extends TcpDiscoveryAbstractMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Pinged client node ID. */
+    private final UUID nodeToPing;
+
+    /** */
+    private final boolean res;
+
+    /**
+     * @param creatorNodeId Creator node ID.
+     * @param nodeToPing Pinged client node ID.
+     */
+    public TcpDiscoveryClientPingResponse(UUID creatorNodeId, @Nullable UUID nodeToPing, boolean res) {
+        super(creatorNodeId);
+
+        this.nodeToPing = nodeToPing;
+        this.res = res;
+    }
+
+    /**
+     * @return Pinged client node ID.
+     */
+    @Nullable public UUID nodeToPing() {
+        return nodeToPing;
+    }
+
+    /**
+     * @return Result of ping.
+     */
+    public boolean result() {
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryClientPingResponse.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index a06bfd9..49ef4aa 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -353,6 +353,36 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testPingFailedNodeFromClient() throws Exception {
+        startServerNodes(2);
+        startClientNodes(1);
+
+        Ignite srv0 = G.ignite("server-0");
+        Ignite srv1 = G.ignite("server-1");
+        Ignite client = G.ignite("client-0");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
+            @Override public void apply(Socket sock) {
+                try {
+                    latch.await();
+                }
+                catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+        assert !((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+
+        latch.countDown();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientReconnectOnRouterFail() throws Exception {
         clientsPerSrv = 1;