You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/03 14:47:05 UTC

incubator-ignite git commit: IGNITE-901 Extracted reconnectClient methods for testing.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 bf97d042a -> b14b73e85


IGNITE-901 Extracted reconnectClient methods for testing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b14b73e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b14b73e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b14b73e8

Branch: refs/heads/ignite-901
Commit: b14b73e85ad016cb8db7b93b17baef6cb86e9a45
Parents: bf97d04
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Jul 3 15:47:16 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 3 15:47:16 2015 +0300

----------------------------------------------------------------------
 .../IgniteClientReconnectAbstractTest.java      |  57 ++
 .../IgniteClientReconnectAtomicsTest.java       | 662 ++-----------------
 2 files changed, 130 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b14b73e8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index e883fb5..a9ce136 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
@@ -34,12 +36,16 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.spi.discovery.tcp.messages.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.eclipse.jetty.util.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.net.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
 /**
  *
  */
@@ -155,6 +161,57 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
     }
 
     /**
+     * Reconnect client node.
+     *
+     * @param client Client.
+     * @param srv Server.
+     * @param disconnectedClosure Closure which will be run when client node disconnected.
+     * @throws Exception If failed.
+     */
+    protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedClosure)
+        throws Exception {
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+        final TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+        if (disconnectedClosure != null)
+            disconnectedClosure.run();
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+    }
+
+    /**
      *
      */
     protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b14b73e8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index c4078f8..a827671 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -18,17 +18,11 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
-import org.apache.ignite.events.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
 import org.apache.ignite.testframework.*;
 
 import java.util.concurrent.*;
 
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-
 /**
  * TODO IGNITE-901: test AtomicReference, AtomicStamped, usage after remove, test API block, fail current call on disconnect.
  */
@@ -53,53 +47,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true);
 
         assertEquals(1L, clientAtomicSeq.incrementAndGet());
 
-        IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false);
+        final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false);
 
         assertEquals(1001L, srvAtomicSeq.incrementAndGet());
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                assertEquals(1002L, srvAtomicSeq.incrementAndGet());
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        assertEquals(1002L, srvAtomicSeq.incrementAndGet());
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         assertEquals(2L, clientAtomicSeq.incrementAndGet());
 
@@ -118,8 +78,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
 
         clientAtomicSeq.batchSize(1);
@@ -132,47 +90,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertEquals(1001L, srvAtomicSeq.incrementAndGet());
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvAtomicSeq.close();
 
-                return true;
+                assert srvAtomicSeq.removed();
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        srvAtomicSeq.close();
-
-        assert srvAtomicSeq.removed();
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
-
-        U.sleep(1000);
+        });
 
         GridTestUtils.assertThrows(log, new Callable<Object>() {
             @Override public Object call() throws Exception {
@@ -194,8 +118,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         BlockTpcCommunicationSpi commSpi = commSpi(srv);
 
         final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqInProg", 0, true);
@@ -228,45 +150,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         commSpi.unblockMsg();
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                // Check that future failed.
+                assertNotNull(fut.error());
+                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        // Check that future failed.
-        assertNotNull(fut.error());
-        assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         // Check that after reconnect working.
         assert clientAtomicSeq.incrementAndGet() >= 0;
@@ -285,59 +175,25 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRef", "1st value", true);
 
         assertEquals("1st value", clientAtomicRef.get());
         assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value"));
         assertEquals("2st value", clientAtomicRef.get());
 
-        IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRef", "1st value", false);
+        final IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRef", "1st value", false);
 
         assertEquals("2st value", srvAtomicRef.get());
         assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value"));
         assertEquals("3st value", srvAtomicRef.get());
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                assertEquals("3st value", srvAtomicRef.get());
+                assertTrue(srvAtomicRef.compareAndSet("3st value", "4st value"));
+                assertEquals("4st value", srvAtomicRef.get());
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        assertEquals("3st value", srvAtomicRef.get());
-        assertTrue(srvAtomicRef.compareAndSet("3st value", "4st value"));
-        assertEquals("4st value", srvAtomicRef.get());
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         assertEquals("4st value", clientAtomicRef.get());
         assertTrue(clientAtomicRef.compareAndSet("4st value", "5st value"));
@@ -360,8 +216,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         final IgniteAtomicReference<String> clientAtomicRef =
             client.atomicReference("atomicRefRemoved", "1st value", true);
 
@@ -369,49 +223,17 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
         assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value"));
         assertEquals("2st value", clientAtomicRef.get());
 
-        IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false);
+        final IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false);
 
         assertEquals("2st value", srvAtomicRef.get());
         assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value"));
         assertEquals("3st value", srvAtomicRef.get());
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvAtomicRef.close();
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        srvAtomicRef.close();
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         GridTestUtils.assertThrows(log, new Callable<Object>() {
             @Override public Object call() throws Exception {
@@ -432,8 +254,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         final IgniteAtomicReference<String> clientAtomicRef =
             client.atomicReference("atomicRefInProg", "1st value", true);
 
@@ -468,45 +288,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         servCommSpi.unblockMsg();
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                // Check that future failed.
+                assertNotNull(fut.error());
+                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        // Check that future failed.
-        assertNotNull(fut.error());
-        assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         // Check that after reconnect working.
         assertEquals("3st value", clientAtomicRef.get());
@@ -530,59 +318,25 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStamped", 0, 0, true);
 
         assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
         assertEquals(1, clientAtomicStamped.value());
         assertEquals(1, clientAtomicStamped.stamp());
 
-        IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStamped", 0, 0, false);
+        final IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStamped", 0, 0, false);
 
         assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2));
         assertEquals(2, srvAtomicStamped.value());
         assertEquals(2, srvAtomicStamped.stamp());
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                assertEquals(true, srvAtomicStamped.compareAndSet(2, 3, 2, 3));
+                assertEquals(3, srvAtomicStamped.value());
+                assertEquals(3, srvAtomicStamped.stamp());
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        assertEquals(true, srvAtomicStamped.compareAndSet(2, 3, 2, 3));
-        assertEquals(3, srvAtomicStamped.value());
-        assertEquals(3, srvAtomicStamped.stamp());
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         assertEquals(true, clientAtomicStamped.compareAndSet(3, 4, 3, 4));
         assertEquals(4, clientAtomicStamped.value());
@@ -605,57 +359,23 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true);
 
         assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
         assertEquals(1, clientAtomicStamped.value());
         assertEquals(1, clientAtomicStamped.stamp());
 
-        IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false);
+        final IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false);
 
         assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2));
         assertEquals(2, srvAtomicStamped.value());
         assertEquals(2, srvAtomicStamped.stamp());
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvAtomicStamped.close();
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        srvAtomicStamped.close();
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         GridTestUtils.assertThrows(log, new Callable<Object>() {
             @Override public Object call() throws Exception {
@@ -676,8 +396,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedInProgress", 0, 0, true);
 
         assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
@@ -711,45 +429,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         servCommSpi.unblockMsg();
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                // Check that future failed.
+                assertNotNull(fut.error());
+                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        // Check that future failed.
-        assertNotNull(fut.error());
-        assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         // Check that after reconnect working.
         assertEquals(true, clientAtomicStamped.compareAndSet(2, 3, 2, 3));
@@ -773,53 +459,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true);
 
         assertEquals(0L, clientAtomicLong.getAndAdd(1));
 
-        IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false);
+        final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false);
 
         assertEquals(1L, srvAtomicLong.getAndAdd(1));
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                assertEquals(2L, srvAtomicLong.getAndAdd(1));
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        assertEquals(2L, srvAtomicLong.getAndAdd(1));
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         assertEquals(3L, clientAtomicLong.getAndAdd(1));
 
@@ -838,53 +490,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
 
         assertEquals(0L, clientAtomicLong.getAndAdd(1));
 
-        IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false);
+        final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false);
 
         assertEquals(1L, srvAtomicLong.getAndAdd(1));
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvAtomicLong.close();
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        srvAtomicLong.close();
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         GridTestUtils.assertThrows(log, new Callable<Object>() {
             @Override public Object call() throws Exception {
@@ -905,8 +523,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         BlockTpcCommunicationSpi commSpi = commSpi(srv);
 
         final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongInProggress", 0, true);
@@ -932,45 +548,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         commSpi.unblockMsg();
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                // Check that future failed.
+                assertNotNull(fut.error());
+                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        // Check that future failed.
-        assertNotNull(fut.error());
-        assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         // Check that after reconnect working.
         assertEquals(1, clientAtomicLong.addAndGet(1));
@@ -989,53 +573,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true);
 
         assertEquals(3, clientLatch.count());
 
-        IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false);
+        final IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false);
 
         assertEquals(3, srvLatch.count());
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override
-            public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvLatch.countDown();
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        srvLatch.countDown();
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        });
 
         assertEquals(2, srvLatch.count());
         assertEquals(2, clientLatch.count());
@@ -1053,70 +603,4 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
         assertTrue(srvLatch.await(1000));
         assertTrue(clientLatch.await(1000));
     }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void _testLatchReconnect2() throws Exception {
-        Ignite client = grid(serverCount());
-
-        assertTrue(client.cluster().localNode().isClient());
-
-        Ignite srv = clientRouter(client);
-
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
-        final IgniteCountDownLatch clientLatch = client.countDownLatch("latch2", 1, false, true);
-
-        IgniteCountDownLatch srvLatch = srv.countDownLatch("latch2", 1, false, false);
-
-        assertFalse(clientLatch.await(100));
-
-        IgniteInternalFuture<Boolean> waitFut = GridTestUtils.runAsync(new Callable<Boolean>() {
-            @Override public Boolean call() throws Exception {
-                return clientLatch.await(60_000, MILLISECONDS);
-            }
-        });
-
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-
-        log.info("Block reconnect.");
-
-        clientSpi.writeLatch = new CountDownLatch(1);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    disconnectLatch.countDown();
-                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    reconnectLatch.countDown();
-                }
-
-                return true;
-            }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        srvSpi.failNode(client.cluster().localNode().id(), null);
-
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
-        srvLatch.countDown();
-
-        assertNotDone(waitFut);
-
-        log.info("Allow reconnect.");
-
-        clientSpi.writeLatch.countDown();
-
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
-
-        assertTrue(waitFut.get(5000));
-    }
 }