You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/12 12:44:16 UTC
[1/2] incubator-ignite git commit: # Rename
TcpClientDiscoverySelfTest to TcpClientDiscoverySpiSelfTest
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-709_2 28498e99b -> 505a03e92
# Rename TcpClientDiscoverySelfTest to TcpClientDiscoverySpiSelfTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c05e368d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c05e368d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c05e368d
Branch: refs/heads/ignite-709_2
Commit: c05e368d9a10e77d39f72fcad22f625402102fda
Parents: 28498e9
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 12 13:37:44 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 12 13:37:44 2015 +0300
----------------------------------------------------------------------
.../tcp/TcpClientDiscoverySelfTest.java | 1028 ------------------
.../tcp/TcpClientDiscoverySpiSelfTest.java | 1028 ++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +-
3 files changed, 1029 insertions(+), 1029 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c05e368d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
deleted file mode 100644
index 2a123ce..0000000
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
+++ /dev/null
@@ -1,1028 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.spi.discovery.tcp.messages.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Client-based discovery tests.
- */
-public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private static final AtomicInteger srvIdx = new AtomicInteger();
-
- /** */
- private static final AtomicInteger clientIdx = new AtomicInteger();
-
- /** */
- private static Collection<UUID> srvNodeIds;
-
- /** */
- private static Collection<UUID> clientNodeIds;
-
- /** */
- private static int clientsPerSrv;
-
- /** */
- private static CountDownLatch srvJoinedLatch;
-
- /** */
- private static CountDownLatch srvLeftLatch;
-
- /** */
- private static CountDownLatch srvFailedLatch;
-
- /** */
- private static CountDownLatch clientJoinedLatch;
-
- /** */
- private static CountDownLatch clientLeftLatch;
-
- /** */
- private static CountDownLatch clientFailedLatch;
-
- /** */
- private static CountDownLatch msgLatch;
-
- /** */
- private UUID nodeId;
-
- /** */
- private TcpDiscoveryVmIpFinder clientIpFinder;
-
- /** */
- private long joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setLocalHost("127.0.0.1");
-
- if (gridName.startsWith("server")) {
- TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
- disco.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(disco);
- }
- else if (gridName.startsWith("client")) {
- TcpClientDiscoverySpi disco = new TestTcpClientDiscovery();
-
- disco.setJoinTimeout(joinTimeout);
-
- TcpDiscoveryVmIpFinder ipFinder;
-
- if (clientIpFinder != null)
- ipFinder = clientIpFinder;
- else {
- ipFinder = new TcpDiscoveryVmIpFinder();
-
- String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).
- get((clientIdx.get() - 1) / clientsPerSrv).toString();
-
- if (addr.startsWith("/"))
- addr = addr.substring(1);
-
- ipFinder.setAddresses(Arrays.asList(addr));
- }
-
- disco.setIpFinder(ipFinder);
-
- cfg.setDiscoverySpi(disco);
-
- String nodeId = cfg.getNodeId().toString();
-
- nodeId = "cc" + nodeId.substring(2);
-
- cfg.setNodeId(UUID.fromString(nodeId));
- }
-
- if (nodeId != null)
- cfg.setNodeId(nodeId);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
-
- if (!F.isEmpty(addrs))
- IP_FINDER.unregisterAddresses(addrs);
-
- srvIdx.set(0);
- clientIdx.set(0);
-
- srvNodeIds = new GridConcurrentHashSet<>();
- clientNodeIds = new GridConcurrentHashSet<>();
-
- clientsPerSrv = 2;
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllClients(true);
- stopAllServers(true);
-
- nodeId = null;
- clientIpFinder = null;
- joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
-
- assert G.allGrids().isEmpty();
- }
-
- /**
- *
- * @throws Exception
- */
- public void testJoinTimeout() throws Exception {
- clientIpFinder = new TcpDiscoveryVmIpFinder();
- joinTimeout = 1000;
-
- try {
- startClientNodes(1);
-
- fail("Client cannot be start because no server nodes run");
- }
- catch (IgniteCheckedException e) {
- IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
-
- assert spiEx != null : e;
-
- assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeJoin() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvJoinedLatch = new CountDownLatch(3);
- clientJoinedLatch = new CountDownLatch(3);
-
- attachListeners(3, 3);
-
- startClientNodes(1);
-
- await(srvJoinedLatch);
- await(clientJoinedLatch);
-
- checkNodes(3, 4);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeLeave() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvLeftLatch = new CountDownLatch(3);
- clientLeftLatch = new CountDownLatch(2);
-
- attachListeners(3, 3);
-
- stopGrid("client-2");
-
- await(srvLeftLatch);
- await(clientLeftLatch);
-
- checkNodes(3, 2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeFail() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvFailedLatch = new CountDownLatch(3);
- clientFailedLatch = new CountDownLatch(2);
-
- attachListeners(3, 3);
-
- failClient(2);
-
- await(srvFailedLatch);
- await(clientFailedLatch);
-
- checkNodes(3, 2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testServerNodeJoin() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvJoinedLatch = new CountDownLatch(3);
- clientJoinedLatch = new CountDownLatch(3);
-
- attachListeners(3, 3);
-
- startServerNodes(1);
-
- await(srvJoinedLatch);
- await(clientJoinedLatch);
-
- checkNodes(4, 3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testServerNodeLeave() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvLeftLatch = new CountDownLatch(2);
- clientLeftLatch = new CountDownLatch(3);
-
- attachListeners(3, 3);
-
- stopGrid("server-2");
-
- await(srvLeftLatch);
- await(clientLeftLatch);
-
- checkNodes(2, 3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testServerNodeFail() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvFailedLatch = new CountDownLatch(2);
- clientFailedLatch = new CountDownLatch(3);
-
- attachListeners(3, 3);
-
- assert U.<Map>field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty();
-
- failServer(2);
-
- await(srvFailedLatch);
- await(clientFailedLatch);
-
- checkNodes(2, 3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPing() throws Exception {
- startServerNodes(2);
- startClientNodes(1);
-
- Ignite srv0 = G.ignite("server-0");
- Ignite srv1 = G.ignite("server-1");
- Ignite client = G.ignite("client-0");
-
- assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
- assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
-
- assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
- assert ((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientReconnectOnRouterFail() throws Exception {
- clientsPerSrv = 1;
-
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- setClientRouter(2, 0);
-
- srvFailedLatch = new CountDownLatch(2);
- clientFailedLatch = new CountDownLatch(3);
-
- attachListeners(2, 3);
-
- failServer(2);
-
- await(srvFailedLatch);
- await(clientFailedLatch);
-
- checkNodes(2, 3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientReconnectOnNetworkProblem() throws Exception {
- clientsPerSrv = 1;
-
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- setClientRouter(2, 0);
-
- srvFailedLatch = new CountDownLatch(2);
- clientFailedLatch = new CountDownLatch(3);
-
- attachListeners(2, 3);
-
- ((TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brokeConnection();
-
- G.ignite("client-2").message().remoteListen(null, new MessageListener()); // Send some discovery message.
-
- checkNodes(3, 3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testGetMissedMessagesOnReconnect() throws Exception {
- clientsPerSrv = 1;
-
- startServerNodes(3);
- startClientNodes(2);
-
- checkNodes(3, 2);
-
- clientLeftLatch = new CountDownLatch(1);
- srvLeftLatch = new CountDownLatch(2);
-
- attachListeners(2, 2);
-
- ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
-
- stopGrid("server-2");
-
- await(srvLeftLatch);
- await(srvLeftLatch);
-
- Thread.sleep(500);
-
- assert G.ignite("client-0").cluster().nodes().size() == 4;
- assert G.ignite("client-1").cluster().nodes().size() == 5;
-
- clientLeftLatch = new CountDownLatch(1);
-
- ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resume();
-
- await(clientLeftLatch);
-
- checkNodes(2, 2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientSegmentation() throws Exception {
- clientsPerSrv = 1;
-
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
-// setClientRouter(2, 2);
-
- srvFailedLatch = new CountDownLatch(2 + 2);
- clientFailedLatch = new CountDownLatch(2 + 2);
-
- attachListeners(2, 2);
-
- final CountDownLatch client2StoppedLatch = new CountDownLatch(1);
-
- IgnitionListener lsnr = new IgnitionListener() {
- @Override public void onStateChange(@Nullable String name, IgniteState state) {
- if (state == IgniteState.STOPPED_ON_SEGMENTATION)
- client2StoppedLatch.countDown();
- }
- };
- G.addListener(lsnr);
-
- try {
- failServer(2);
-
- await(srvFailedLatch);
- await(clientFailedLatch);
-
- await(client2StoppedLatch);
-
- checkNodes(2, 2);
- }
- finally {
- G.removeListener(lsnr);
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeJoinOneServer() throws Exception {
- startServerNodes(1);
-
- srvJoinedLatch = new CountDownLatch(1);
-
- attachListeners(1, 0);
-
- startClientNodes(1);
-
- await(srvJoinedLatch);
-
- checkNodes(1, 1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeLeaveOneServer() throws Exception {
- startServerNodes(1);
- startClientNodes(1);
-
- checkNodes(1, 1);
-
- srvLeftLatch = new CountDownLatch(1);
-
- attachListeners(1, 0);
-
- stopGrid("client-0");
-
- await(srvLeftLatch);
-
- checkNodes(1, 0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeFailOneServer() throws Exception {
- startServerNodes(1);
- startClientNodes(1);
-
- checkNodes(1, 1);
-
- srvFailedLatch = new CountDownLatch(1);
-
- attachListeners(1, 0);
-
- failClient(0);
-
- await(srvFailedLatch);
-
- checkNodes(1, 0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMetrics() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- attachListeners(3, 3);
-
- assertTrue(checkMetrics(3, 3, 0));
-
- G.ignite("client-0").compute().broadcast(F.noop());
-
- assertTrue(GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return checkMetrics(3, 3, 1);
- }
- }, 10000));
-
- checkMetrics(3, 3, 1);
-
- G.ignite("server-0").compute().broadcast(F.noop());
-
- assertTrue(GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return checkMetrics(3, 3, 2);
- }
- }, 10000));
- }
-
- /**
- * @param srvCnt Number of Number of server nodes.
- * @param clientCnt Number of client nodes.
- * @param execJobsCnt Expected number of executed jobs.
- * @return Whether metrics are correct.
- */
- private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) {
- for (int i = 0; i < srvCnt; i++) {
- Ignite g = G.ignite("server-" + i);
-
- for (ClusterNode n : g.cluster().nodes()) {
- if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
- return false;
- }
- }
-
- for (int i = 0; i < clientCnt; i++) {
- Ignite g = G.ignite("client-" + i);
-
- for (ClusterNode n : g.cluster().nodes()) {
- if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
- return false;
- }
- }
-
- return true;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDataExchangeFromServer() throws Exception {
- testDataExchange("server-0");
- }
-
- /**
- * TODO: IGNITE-587.
- *
- * @throws Exception If failed.
- */
- public void testDataExchangeFromClient() throws Exception {
- testDataExchange("client-0");
- }
-
- /**
- * @throws Exception If failed.
- */
- private void testDataExchange(String masterName) throws Exception {
- startServerNodes(2);
- startClientNodes(2);
-
- checkNodes(2, 2);
-
- IgniteMessaging msg = grid(masterName).message();
-
- UUID id = null;
-
- try {
- id = msg.remoteListen(null, new MessageListener());
-
- msgLatch = new CountDownLatch(4);
-
- msg.send(null, "Message 1");
-
- await(msgLatch);
-
- startServerNodes(1);
- startClientNodes(1);
-
- checkNodes(3, 3);
-
- msgLatch = new CountDownLatch(6);
-
- msg.send(null, "Message 2");
-
- await(msgLatch);
- }
- finally {
- if (id != null)
- msg.stopRemoteListen(id);
- }
- }
-
- /**
- * @throws Exception If any error occurs.
- */
- public void testDuplicateId() throws Exception {
- startServerNodes(2);
-
- nodeId = G.ignite("server-1").cluster().localNode().id();
-
- try {
- startGrid("client-0");
-
- assert false;
- }
- catch (IgniteCheckedException e) {
- IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
-
- assert spiEx != null : e;
- assert spiEx.getMessage().contains("same ID") : spiEx.getMessage();
- }
- }
-
- /**
- * @throws Exception If any error occurs.
- */
- public void testTimeoutWaitingNodeAddedMessage() throws Exception {
- startServerNodes(2);
-
- final CountDownLatch cnt = new CountDownLatch(1);
-
- ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(
- new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
- @Override public void apply(TcpDiscoveryAbstractMessage msg) {
- try {
- cnt.await(10, MINUTES);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedException(e);
- }
- }
- });
-
- try {
- startGrid("client-0");
-
- assert false;
- }
- catch (IgniteCheckedException e) {
- cnt.countDown();
-
- IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
-
- assert spiEx != null : e;
- assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
- }
- }
-
- /**
- * @throws Exception If any error occurs.
- */
- public void testGridStartTime() throws Exception {
- startServerNodes(2);
-
- startClientNodes(2);
-
- long startTime = -1;
-
- for (Ignite g : G.allGrids()) {
- IgniteEx kernal = (IgniteKernal)g;
-
- assertTrue(kernal.context().discovery().gridStartTime() > 0);
-
- if (startTime == -1)
- startTime = kernal.context().discovery().gridStartTime();
- else
- assertEquals(startTime, kernal.context().discovery().gridStartTime());
- }
- }
-
- /**
- * @param clientIdx Index.
- * @throws Exception In case of error.
- */
- private void setClientRouter(int clientIdx, int srvIdx) throws Exception {
- TcpClientDiscoverySpi disco =
- (TcpClientDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi();
-
- TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder();
-
- String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).get(srvIdx).toString();
-
- if (addr.startsWith("/"))
- addr = addr.substring(1);
-
- ipFinder.setAddresses(Arrays.asList(addr));
- }
-
- /**
- * @param cnt Number of nodes.
- * @throws Exception In case of error.
- */
- private void startServerNodes(int cnt) throws Exception {
- for (int i = 0; i < cnt; i++) {
- Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
-
- srvNodeIds.add(g.cluster().localNode().id());
- }
- }
-
- /**
- * @param cnt Number of nodes.
- * @throws Exception In case of error.
- */
- private void startClientNodes(int cnt) throws Exception {
- for (int i = 0; i < cnt; i++) {
- Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
-
- clientNodeIds.add(g.cluster().localNode().id());
- }
- }
-
- /**
- * @param idx Index.
- */
- private void failServer(int idx) {
- ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
- }
-
- /**
- * @param idx Index.
- */
- private void failClient(int idx) {
- ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
- }
-
- /**
- * @param srvCnt Number of server nodes.
- * @param clientCnt Number of client nodes.
- */
- private void attachListeners(int srvCnt, int clientCnt) throws Exception {
- if (srvJoinedLatch != null) {
- for (int i = 0; i < srvCnt; i++) {
- G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Joined event fired on server: " + evt);
-
- srvJoinedLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_JOINED);
- }
- }
-
- if (srvLeftLatch != null) {
- for (int i = 0; i < srvCnt; i++) {
- G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Left event fired on server: " + evt);
-
- srvLeftLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_LEFT);
- }
- }
-
- if (srvFailedLatch != null) {
- for (int i = 0; i < srvCnt; i++) {
- G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Failed event fired on server: " + evt);
-
- srvFailedLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_FAILED);
- }
- }
-
- if (clientJoinedLatch != null) {
- for (int i = 0; i < clientCnt; i++) {
- G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Joined event fired on client: " + evt);
-
- clientJoinedLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_JOINED);
- }
- }
-
- if (clientLeftLatch != null) {
- for (int i = 0; i < clientCnt; i++) {
- G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Left event fired on client: " + evt);
-
- clientLeftLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_LEFT);
- }
- }
-
- if (clientFailedLatch != null) {
- for (int i = 0; i < clientCnt; i++) {
- G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Failed event fired on client: " + evt);
-
- clientFailedLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_FAILED);
- }
- }
- }
-
- /**
- * @param srvCnt Number of server nodes.
- * @param clientCnt Number of client nodes.
- */
- private void checkNodes(int srvCnt, int clientCnt) {
- for (int i = 0; i < srvCnt; i++) {
- Ignite g = G.ignite("server-" + i);
-
- assertTrue(srvNodeIds.contains(g.cluster().localNode().id()));
-
- assertFalse(g.cluster().localNode().isClient());
-
- checkRemoteNodes(g, srvCnt + clientCnt - 1);
- }
-
- for (int i = 0; i < clientCnt; i++) {
- Ignite g = G.ignite("client-" + i);
-
- assertTrue(clientNodeIds.contains(g.cluster().localNode().id()));
-
- assertTrue(g.cluster().localNode().isClient());
-
- checkRemoteNodes(g, srvCnt + clientCnt - 1);
- }
- }
-
- /**
- * @param ignite Grid.
- * @param expCnt Expected nodes count.
- */
- @SuppressWarnings("TypeMayBeWeakened")
- private void checkRemoteNodes(Ignite ignite, int expCnt) {
- Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
-
- assertEquals(expCnt, nodes.size());
-
- for (ClusterNode node : nodes) {
- UUID id = node.id();
-
- if (clientNodeIds.contains(id))
- assertTrue(node.isClient());
- else if (srvNodeIds.contains(id))
- assertFalse(node.isClient());
- else
- assert false : "Unexpected node ID: " + id;
- }
- }
-
- /**
- * @param latch Latch.
- * @throws InterruptedException If interrupted.
- */
- private void await(CountDownLatch latch) throws InterruptedException {
- assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
- }
-
- /**
- */
- private static class MessageListener implements IgniteBiPredicate<UUID, Object> {
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** {@inheritDoc} */
- @Override public boolean apply(UUID uuid, Object msg) {
- X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
-
- msgLatch.countDown();
-
- return true;
- }
- }
-
- /**
- *
- */
- private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi {
- /** */
- private final Object mux = new Object();
-
- /** */
- private final AtomicBoolean writeLock = new AtomicBoolean();
-
- /** */
- private final AtomicBoolean openSockLock = new AtomicBoolean();
-
- /**
- * @param lock Lock.
- */
- private void waitFor(AtomicBoolean lock) {
- try {
- synchronized (mux) {
- while (lock.get())
- mux.wait();
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new RuntimeException(e);
- }
- }
-
- /**
- * @param isPause Is lock.
- * @param locks Locks.
- */
- private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) {
- synchronized (mux) {
- for (AtomicBoolean lock : locks)
- lock.set(isPause);
-
- mux.notifyAll();
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
- GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
- waitFor(writeLock);
-
- super.writeToSocket(sock, msg, bout);
- }
-
- /** {@inheritDoc} */
- @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
- waitFor(openSockLock);
-
- return super.openSocket(sockAddr);
- }
-
- /**
- *
- */
- private void pauseAll() {
- pauseResumeOperation(true, openSockLock, writeLock);
-
- brokeConnection();
- }
-
- /**
- *
- */
- private void resume() {
- pauseResumeOperation(false, openSockLock, writeLock);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c05e368d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
new file mode 100644
index 0000000..a06bfd9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -0,0 +1,1028 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Client-based discovery tests.
+ */
+public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final AtomicInteger srvIdx = new AtomicInteger();
+
+ /** */
+ private static final AtomicInteger clientIdx = new AtomicInteger();
+
+ /** */
+ private static Collection<UUID> srvNodeIds;
+
+ /** */
+ private static Collection<UUID> clientNodeIds;
+
+ /** */
+ private static int clientsPerSrv;
+
+ /** */
+ private static CountDownLatch srvJoinedLatch;
+
+ /** */
+ private static CountDownLatch srvLeftLatch;
+
+ /** */
+ private static CountDownLatch srvFailedLatch;
+
+ /** */
+ private static CountDownLatch clientJoinedLatch;
+
+ /** */
+ private static CountDownLatch clientLeftLatch;
+
+ /** */
+ private static CountDownLatch clientFailedLatch;
+
+ /** */
+ private static CountDownLatch msgLatch;
+
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private TcpDiscoveryVmIpFinder clientIpFinder;
+
+ /** */
+ private long joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setLocalHost("127.0.0.1");
+
+ if (gridName.startsWith("server")) {
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+ }
+ else if (gridName.startsWith("client")) {
+ TcpClientDiscoverySpi disco = new TestTcpClientDiscovery();
+
+ disco.setJoinTimeout(joinTimeout);
+
+ TcpDiscoveryVmIpFinder ipFinder;
+
+ if (clientIpFinder != null)
+ ipFinder = clientIpFinder;
+ else {
+ ipFinder = new TcpDiscoveryVmIpFinder();
+
+ String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).
+ get((clientIdx.get() - 1) / clientsPerSrv).toString();
+
+ if (addr.startsWith("/"))
+ addr = addr.substring(1);
+
+ ipFinder.setAddresses(Arrays.asList(addr));
+ }
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ String nodeId = cfg.getNodeId().toString();
+
+ nodeId = "cc" + nodeId.substring(2);
+
+ cfg.setNodeId(UUID.fromString(nodeId));
+ }
+
+ if (nodeId != null)
+ cfg.setNodeId(nodeId);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
+
+ if (!F.isEmpty(addrs))
+ IP_FINDER.unregisterAddresses(addrs);
+
+ srvIdx.set(0);
+ clientIdx.set(0);
+
+ srvNodeIds = new GridConcurrentHashSet<>();
+ clientNodeIds = new GridConcurrentHashSet<>();
+
+ clientsPerSrv = 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllClients(true);
+ stopAllServers(true);
+
+ nodeId = null;
+ clientIpFinder = null;
+ joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
+
+ assert G.allGrids().isEmpty();
+ }
+
+ /**
+ *
+ * @throws Exception
+ */
+ public void testJoinTimeout() throws Exception {
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+ joinTimeout = 1000;
+
+ try {
+ startClientNodes(1);
+
+ fail("Client cannot be start because no server nodes run");
+ }
+ catch (IgniteCheckedException e) {
+ IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+ assert spiEx != null : e;
+
+ assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeJoin() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvJoinedLatch = new CountDownLatch(3);
+ clientJoinedLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ startClientNodes(1);
+
+ await(srvJoinedLatch);
+ await(clientJoinedLatch);
+
+ checkNodes(3, 4);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeLeave() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvLeftLatch = new CountDownLatch(3);
+ clientLeftLatch = new CountDownLatch(2);
+
+ attachListeners(3, 3);
+
+ stopGrid("client-2");
+
+ await(srvLeftLatch);
+ await(clientLeftLatch);
+
+ checkNodes(3, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeFail() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvFailedLatch = new CountDownLatch(3);
+ clientFailedLatch = new CountDownLatch(2);
+
+ attachListeners(3, 3);
+
+ failClient(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ checkNodes(3, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeJoin() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvJoinedLatch = new CountDownLatch(3);
+ clientJoinedLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ startServerNodes(1);
+
+ await(srvJoinedLatch);
+ await(clientJoinedLatch);
+
+ checkNodes(4, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeLeave() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvLeftLatch = new CountDownLatch(2);
+ clientLeftLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ stopGrid("server-2");
+
+ await(srvLeftLatch);
+ await(clientLeftLatch);
+
+ checkNodes(2, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeFail() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvFailedLatch = new CountDownLatch(2);
+ clientFailedLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ assert U.<Map>field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty();
+
+ failServer(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ checkNodes(2, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPing() throws Exception {
+ startServerNodes(2);
+ startClientNodes(1);
+
+ Ignite srv0 = G.ignite("server-0");
+ Ignite srv1 = G.ignite("server-1");
+ Ignite client = G.ignite("client-0");
+
+ assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+ assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+
+ assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+ assert ((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnRouterFail() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ setClientRouter(2, 0);
+
+ srvFailedLatch = new CountDownLatch(2);
+ clientFailedLatch = new CountDownLatch(3);
+
+ attachListeners(2, 3);
+
+ failServer(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ checkNodes(2, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnNetworkProblem() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ setClientRouter(2, 0);
+
+ srvFailedLatch = new CountDownLatch(2);
+ clientFailedLatch = new CountDownLatch(3);
+
+ attachListeners(2, 3);
+
+ ((TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brokeConnection();
+
+ G.ignite("client-2").message().remoteListen(null, new MessageListener()); // Send some discovery message.
+
+ checkNodes(3, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetMissedMessagesOnReconnect() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(2);
+
+ checkNodes(3, 2);
+
+ clientLeftLatch = new CountDownLatch(1);
+ srvLeftLatch = new CountDownLatch(2);
+
+ attachListeners(2, 2);
+
+ ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
+
+ stopGrid("server-2");
+
+ await(srvLeftLatch);
+ await(srvLeftLatch);
+
+ Thread.sleep(500);
+
+ assert G.ignite("client-0").cluster().nodes().size() == 4;
+ assert G.ignite("client-1").cluster().nodes().size() == 5;
+
+ clientLeftLatch = new CountDownLatch(1);
+
+ ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resume();
+
+ await(clientLeftLatch);
+
+ checkNodes(2, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientSegmentation() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+// setClientRouter(2, 2);
+
+ srvFailedLatch = new CountDownLatch(2 + 2);
+ clientFailedLatch = new CountDownLatch(2 + 2);
+
+ attachListeners(2, 2);
+
+ final CountDownLatch client2StoppedLatch = new CountDownLatch(1);
+
+ IgnitionListener lsnr = new IgnitionListener() {
+ @Override public void onStateChange(@Nullable String name, IgniteState state) {
+ if (state == IgniteState.STOPPED_ON_SEGMENTATION)
+ client2StoppedLatch.countDown();
+ }
+ };
+ G.addListener(lsnr);
+
+ try {
+ failServer(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ await(client2StoppedLatch);
+
+ checkNodes(2, 2);
+ }
+ finally {
+ G.removeListener(lsnr);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeJoinOneServer() throws Exception {
+ startServerNodes(1);
+
+ srvJoinedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ startClientNodes(1);
+
+ await(srvJoinedLatch);
+
+ checkNodes(1, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeLeaveOneServer() throws Exception {
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+
+ srvLeftLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ stopGrid("client-0");
+
+ await(srvLeftLatch);
+
+ checkNodes(1, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeFailOneServer() throws Exception {
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+
+ srvFailedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ failClient(0);
+
+ await(srvFailedLatch);
+
+ checkNodes(1, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMetrics() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ attachListeners(3, 3);
+
+ assertTrue(checkMetrics(3, 3, 0));
+
+ G.ignite("client-0").compute().broadcast(F.noop());
+
+ assertTrue(GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return checkMetrics(3, 3, 1);
+ }
+ }, 10000));
+
+ checkMetrics(3, 3, 1);
+
+ G.ignite("server-0").compute().broadcast(F.noop());
+
+ assertTrue(GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return checkMetrics(3, 3, 2);
+ }
+ }, 10000));
+ }
+
+ /**
+ * @param srvCnt Number of Number of server nodes.
+ * @param clientCnt Number of client nodes.
+ * @param execJobsCnt Expected number of executed jobs.
+ * @return Whether metrics are correct.
+ */
+ private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) {
+ for (int i = 0; i < srvCnt; i++) {
+ Ignite g = G.ignite("server-" + i);
+
+ for (ClusterNode n : g.cluster().nodes()) {
+ if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
+ return false;
+ }
+ }
+
+ for (int i = 0; i < clientCnt; i++) {
+ Ignite g = G.ignite("client-" + i);
+
+ for (ClusterNode n : g.cluster().nodes()) {
+ if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDataExchangeFromServer() throws Exception {
+ testDataExchange("server-0");
+ }
+
+ /**
+ * TODO: IGNITE-587.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDataExchangeFromClient() throws Exception {
+ testDataExchange("client-0");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testDataExchange(String masterName) throws Exception {
+ startServerNodes(2);
+ startClientNodes(2);
+
+ checkNodes(2, 2);
+
+ IgniteMessaging msg = grid(masterName).message();
+
+ UUID id = null;
+
+ try {
+ id = msg.remoteListen(null, new MessageListener());
+
+ msgLatch = new CountDownLatch(4);
+
+ msg.send(null, "Message 1");
+
+ await(msgLatch);
+
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(3, 3);
+
+ msgLatch = new CountDownLatch(6);
+
+ msg.send(null, "Message 2");
+
+ await(msgLatch);
+ }
+ finally {
+ if (id != null)
+ msg.stopRemoteListen(id);
+ }
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testDuplicateId() throws Exception {
+ startServerNodes(2);
+
+ nodeId = G.ignite("server-1").cluster().localNode().id();
+
+ try {
+ startGrid("client-0");
+
+ assert false;
+ }
+ catch (IgniteCheckedException e) {
+ IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+ assert spiEx != null : e;
+ assert spiEx.getMessage().contains("same ID") : spiEx.getMessage();
+ }
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testTimeoutWaitingNodeAddedMessage() throws Exception {
+ startServerNodes(2);
+
+ final CountDownLatch cnt = new CountDownLatch(1);
+
+ ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(
+ new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
+ @Override public void apply(TcpDiscoveryAbstractMessage msg) {
+ try {
+ cnt.await(10, MINUTES);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedException(e);
+ }
+ }
+ });
+
+ try {
+ startGrid("client-0");
+
+ assert false;
+ }
+ catch (IgniteCheckedException e) {
+ cnt.countDown();
+
+ IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+ assert spiEx != null : e;
+ assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+ }
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testGridStartTime() throws Exception {
+ startServerNodes(2);
+
+ startClientNodes(2);
+
+ long startTime = -1;
+
+ for (Ignite g : G.allGrids()) {
+ IgniteEx kernal = (IgniteKernal)g;
+
+ assertTrue(kernal.context().discovery().gridStartTime() > 0);
+
+ if (startTime == -1)
+ startTime = kernal.context().discovery().gridStartTime();
+ else
+ assertEquals(startTime, kernal.context().discovery().gridStartTime());
+ }
+ }
+
+ /**
+ * @param clientIdx Index.
+ * @throws Exception In case of error.
+ */
+ private void setClientRouter(int clientIdx, int srvIdx) throws Exception {
+ TcpClientDiscoverySpi disco =
+ (TcpClientDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi();
+
+ TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder();
+
+ String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).get(srvIdx).toString();
+
+ if (addr.startsWith("/"))
+ addr = addr.substring(1);
+
+ ipFinder.setAddresses(Arrays.asList(addr));
+ }
+
+ /**
+ * @param cnt Number of nodes.
+ * @throws Exception In case of error.
+ */
+ private void startServerNodes(int cnt) throws Exception {
+ for (int i = 0; i < cnt; i++) {
+ Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+ srvNodeIds.add(g.cluster().localNode().id());
+ }
+ }
+
+ /**
+ * @param cnt Number of nodes.
+ * @throws Exception In case of error.
+ */
+ private void startClientNodes(int cnt) throws Exception {
+ for (int i = 0; i < cnt; i++) {
+ Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
+
+ clientNodeIds.add(g.cluster().localNode().id());
+ }
+ }
+
+ /**
+ * @param idx Index.
+ */
+ private void failServer(int idx) {
+ ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
+ }
+
+ /**
+ * @param idx Index.
+ */
+ private void failClient(int idx) {
+ ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
+ }
+
+ /**
+ * @param srvCnt Number of server nodes.
+ * @param clientCnt Number of client nodes.
+ */
+ private void attachListeners(int srvCnt, int clientCnt) throws Exception {
+ if (srvJoinedLatch != null) {
+ for (int i = 0; i < srvCnt; i++) {
+ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Joined event fired on server: " + evt);
+
+ srvJoinedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+ }
+ }
+
+ if (srvLeftLatch != null) {
+ for (int i = 0; i < srvCnt; i++) {
+ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Left event fired on server: " + evt);
+
+ srvLeftLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_LEFT);
+ }
+ }
+
+ if (srvFailedLatch != null) {
+ for (int i = 0; i < srvCnt; i++) {
+ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Failed event fired on server: " + evt);
+
+ srvFailedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+ }
+ }
+
+ if (clientJoinedLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Joined event fired on client: " + evt);
+
+ clientJoinedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+ }
+ }
+
+ if (clientLeftLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Left event fired on client: " + evt);
+
+ clientLeftLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_LEFT);
+ }
+ }
+
+ if (clientFailedLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Failed event fired on client: " + evt);
+
+ clientFailedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+ }
+ }
+ }
+
+ /**
+ * @param srvCnt Number of server nodes.
+ * @param clientCnt Number of client nodes.
+ */
+ private void checkNodes(int srvCnt, int clientCnt) {
+ for (int i = 0; i < srvCnt; i++) {
+ Ignite g = G.ignite("server-" + i);
+
+ assertTrue(srvNodeIds.contains(g.cluster().localNode().id()));
+
+ assertFalse(g.cluster().localNode().isClient());
+
+ checkRemoteNodes(g, srvCnt + clientCnt - 1);
+ }
+
+ for (int i = 0; i < clientCnt; i++) {
+ Ignite g = G.ignite("client-" + i);
+
+ assertTrue(clientNodeIds.contains(g.cluster().localNode().id()));
+
+ assertTrue(g.cluster().localNode().isClient());
+
+ checkRemoteNodes(g, srvCnt + clientCnt - 1);
+ }
+ }
+
+ /**
+ * @param ignite Grid.
+ * @param expCnt Expected nodes count.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ private void checkRemoteNodes(Ignite ignite, int expCnt) {
+ Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
+
+ assertEquals(expCnt, nodes.size());
+
+ for (ClusterNode node : nodes) {
+ UUID id = node.id();
+
+ if (clientNodeIds.contains(id))
+ assertTrue(node.isClient());
+ else if (srvNodeIds.contains(id))
+ assertFalse(node.isClient());
+ else
+ assert false : "Unexpected node ID: " + id;
+ }
+ }
+
+ /**
+ * @param latch Latch.
+ * @throws InterruptedException If interrupted.
+ */
+ private void await(CountDownLatch latch) throws InterruptedException {
+ assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
+ }
+
+ /**
+ */
+ private static class MessageListener implements IgniteBiPredicate<UUID, Object> {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object msg) {
+ X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
+
+ msgLatch.countDown();
+
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi {
+ /** */
+ private final Object mux = new Object();
+
+ /** */
+ private final AtomicBoolean writeLock = new AtomicBoolean();
+
+ /** */
+ private final AtomicBoolean openSockLock = new AtomicBoolean();
+
+ /**
+ * @param lock Lock.
+ */
+ private void waitFor(AtomicBoolean lock) {
+ try {
+ synchronized (mux) {
+ while (lock.get())
+ mux.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param isPause Is lock.
+ * @param locks Locks.
+ */
+ private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) {
+ synchronized (mux) {
+ for (AtomicBoolean lock : locks)
+ lock.set(isPause);
+
+ mux.notifyAll();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+ waitFor(writeLock);
+
+ super.writeToSocket(sock, msg, bout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+ waitFor(openSockLock);
+
+ return super.openSocket(sockAddr);
+ }
+
+ /**
+ *
+ */
+ private void pauseAll() {
+ pauseResumeOperation(true, openSockLock, writeLock);
+
+ brokeConnection();
+ }
+
+ /**
+ *
+ */
+ private void resume() {
+ pauseResumeOperation(false, openSockLock, writeLock);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c05e368d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index ebc7111..8bf8dbc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -51,7 +51,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridTcpSpiForwardingSelfTest.class));
- suite.addTest(new TestSuite(TcpClientDiscoverySelfTest.class));
+ suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiConfigSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
[2/2] incubator-ignite git commit: # IGNITE-709 Improve ping from
client to server.
Posted by sb...@apache.org.
# IGNITE-709 Improve ping from client to server.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/505a03e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/505a03e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/505a03e9
Branch: refs/heads/ignite-709_2
Commit: 505a03e92db2747ed5beb07eb47cce17271d387c
Parents: c05e368
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 12 13:43:26 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 12 13:43:26 2015 +0300
----------------------------------------------------------------------
.../discovery/tcp/TcpClientDiscoverySpi.java | 56 +++++++++++++++-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 62 ++++++++++++++++--
.../messages/TcpDiscoveryClientPingRequest.java | 56 ++++++++++++++++
.../TcpDiscoveryClientPingResponse.java | 67 ++++++++++++++++++++
.../tcp/TcpClientDiscoverySpiSelfTest.java | 30 +++++++++
5 files changed, 264 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 6752bf5..d55d1c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -75,6 +76,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
/** Remote nodes. */
private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
+ /** Remote nodes. */
+ private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>();
+
/** Socket writer. */
private SocketWriter sockWriter;
@@ -316,6 +320,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
}
}
+ for (GridFutureAdapter<Boolean> fut : pingFuts.values())
+ fut.onDone(false);
+
rmtNodes.clear();
U.interrupt(sockTimeoutWorker);
@@ -359,15 +366,46 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
}
/** {@inheritDoc} */
- @Override public boolean pingNode(UUID nodeId) {
- assert nodeId != null;
+ @Override public boolean pingNode(@NotNull final UUID nodeId) {
+ if (getSpiContext().isStopping())
+ return false;
if (nodeId.equals(getLocalNodeId()))
return true;
TcpDiscoveryNode node = rmtNodes.get(nodeId);
- return node != null && node.visible();
+ if (node == null || !node.visible())
+ return false;
+
+ GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId);
+
+ if (fut == null) {
+ fut = new GridFutureAdapter<>();
+
+ GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut);
+
+ if (oldFut != null)
+ fut = oldFut;
+ else
+ sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+ }
+
+ final GridFutureAdapter<Boolean> finalFut = fut;
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (pingFuts.remove(nodeId, finalFut))
+ finalFut.onDone(false);
+ }
+ }, netTimeout);
+
+ try {
+ return fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException(e); // Should newer occur
+ }
}
/** {@inheritDoc} */
@@ -1069,6 +1107,8 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
else if (msg instanceof TcpDiscoveryCustomEventMessage)
processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+ else if (msg instanceof TcpDiscoveryClientPingResponse)
+ processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
stats.onMessageProcessingFinished(msg);
}
@@ -1366,6 +1406,16 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
}
/**
+ * @param msg Message.
+ */
+ private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) {
+ GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing());
+
+ if (fut != null)
+ fut.onDone(msg.result());
+ }
+
+ /**
* @param nodeId Node ID.
* @param metrics Metrics.
* @param cacheMetrics Cache metrics.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 3624791..e00f798 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -203,6 +203,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private int reconCnt = DFLT_RECONNECT_CNT;
+ /** */
+ private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 10, 2000, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+
/** Nodes ring. */
@GridToStringExclude
private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
@@ -285,6 +289,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
private final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs =
new CopyOnWriteArrayList<>();
+ /** */
+ private final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs =
+ new CopyOnWriteArrayList<>();
+
/** {@inheritDoc} */
@IgniteInstanceResource
@Override public void injectResources(Ignite ignite) {
@@ -2034,15 +2042,29 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
- public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) {
- sendMsgLsnrs.add(msg);
+ public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> lsnr) {
+ sendMsgLsnrs.add(lsnr);
+ }
+
+ /**
+ * <strong>FOR TEST ONLY!!!</strong>
+ */
+ public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> lsnr) {
+ sendMsgLsnrs.remove(lsnr);
+ }
+
+ /**
+ * <strong>FOR TEST ONLY!!!</strong>
+ */
+ public void addIncomeConnectionListener(IgniteInClosure<Socket> lsnr) {
+ incomeConnLsnrs.add(lsnr);
}
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
- public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) {
- sendMsgLsnrs.remove(msg);
+ public void removeIncomeConnectionListener(IgniteInClosure<Socket> lsnr) {
+ incomeConnLsnrs.remove(lsnr);
}
/**
@@ -2634,6 +2656,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
else if (msg instanceof TcpDiscoveryCustomEventMessage)
processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+ else if (msg instanceof TcpDiscoveryClientPingRequest)
+ processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
+
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
@@ -4448,6 +4473,32 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
/**
* @param msg Message.
*/
+ private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) {
+ utilityPool.execute(new Runnable() {
+ @Override public void run() {
+ boolean res = pingNode(msg.nodeToPing());
+
+ final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId());
+
+ if (worker == null) {
+ if (log.isDebugEnabled())
+ log.debug("Ping request from dead client node, will be skipped: " + msg.creatorNodeId());
+ }
+ else {
+ TcpDiscoveryClientPingResponse pingRes = new TcpDiscoveryClientPingResponse(
+ getLocalNodeId(), msg.nodeToPing(), res);
+
+ pingRes.verify(getLocalNodeId());
+
+ worker.addMessage(pingRes);
+ }
+ }
+ });
+ }
+
+ /**
+ * @param msg Message.
+ */
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
if (isLocalNodeCoordinator()) {
if (msg.verified()) {
@@ -4643,6 +4694,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
sock.setSoTimeout((int)netTimeout);
+ for (IgniteInClosure<Socket> connLsnr : incomeConnLsnrs)
+ connLsnr.apply(sock);
+
in = new BufferedInputStream(sock.getInputStream());
byte[] buf = new byte[4];
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
new file mode 100644
index 0000000..f9f164d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryClientPingRequest extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Pinged client node ID. */
+ private final UUID nodeToPing;
+
+ /**
+ * @param creatorNodeId Creator node ID.
+ * @param nodeToPing Pinged client node ID.
+ */
+ public TcpDiscoveryClientPingRequest(UUID creatorNodeId, @Nullable UUID nodeToPing) {
+ super(creatorNodeId);
+
+ this.nodeToPing = nodeToPing;
+ }
+
+ /**
+ * @return Pinged client node ID.
+ */
+ @Nullable public UUID nodeToPing() {
+ return nodeToPing;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryClientPingRequest.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
new file mode 100644
index 0000000..26a2b00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryClientPingResponse extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Pinged client node ID. */
+ private final UUID nodeToPing;
+
+ /** */
+ private final boolean res;
+
+ /**
+ * @param creatorNodeId Creator node ID.
+ * @param nodeToPing Pinged client node ID.
+ */
+ public TcpDiscoveryClientPingResponse(UUID creatorNodeId, @Nullable UUID nodeToPing, boolean res) {
+ super(creatorNodeId);
+
+ this.nodeToPing = nodeToPing;
+ this.res = res;
+ }
+
+ /**
+ * @return Pinged client node ID.
+ */
+ @Nullable public UUID nodeToPing() {
+ return nodeToPing;
+ }
+
+ /**
+ * @return Result of ping.
+ */
+ public boolean result() {
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryClientPingResponse.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index a06bfd9..49ef4aa 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -353,6 +353,36 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testPingFailedNodeFromClient() throws Exception {
+ startServerNodes(2);
+ startClientNodes(1);
+
+ Ignite srv0 = G.ignite("server-0");
+ Ignite srv1 = G.ignite("server-1");
+ Ignite client = G.ignite("client-0");
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
+ @Override public void apply(Socket sock) {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+ assert !((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+
+ latch.countDown();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testClientReconnectOnRouterFail() throws Exception {
clientsPerSrv = 1;