You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/03 14:47:05 UTC
incubator-ignite git commit: IGNITE-901 Extracted reconnectClient
methods for testing.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-901 bf97d042a -> b14b73e85
IGNITE-901 Extracted reconnectClient methods for testing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b14b73e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b14b73e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b14b73e8
Branch: refs/heads/ignite-901
Commit: b14b73e85ad016cb8db7b93b17baef6cb86e9a45
Parents: bf97d04
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Jul 3 15:47:16 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 3 15:47:16 2015 +0300
----------------------------------------------------------------------
.../IgniteClientReconnectAbstractTest.java | 57 ++
.../IgniteClientReconnectAtomicsTest.java | 662 ++-----------------
2 files changed, 130 insertions(+), 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b14b73e8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index e883fb5..a9ce136 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
@@ -34,12 +36,16 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.spi.discovery.tcp.messages.*;
import org.apache.ignite.testframework.junits.common.*;
import org.eclipse.jetty.util.*;
+import org.jetbrains.annotations.*;
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
/**
*
*/
@@ -155,6 +161,57 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
}
/**
+ * Reconnect client node.
+ *
+ * @param client Client.
+ * @param srv Server.
+ * @param disconnectedClosure Closure which will be run when client node disconnected.
+ * @throws Exception If failed.
+ */
+ protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedClosure)
+ throws Exception {
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+ final TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ if (disconnectedClosure != null)
+ disconnectedClosure.run();
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ }
+
+ /**
*
*/
protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b14b73e8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index c4078f8..a827671 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -18,17 +18,11 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
-import org.apache.ignite.events.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
import org.apache.ignite.testframework.*;
import java.util.concurrent.*;
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-
/**
* TODO IGNITE-901: test AtomicReference, AtomicStamped, usage after remove, test API block, fail current call on disconnect.
*/
@@ -53,53 +47,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true);
assertEquals(1L, clientAtomicSeq.incrementAndGet());
- IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false);
+ final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false);
assertEquals(1001L, srvAtomicSeq.incrementAndGet());
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertEquals(1002L, srvAtomicSeq.incrementAndGet());
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- assertEquals(1002L, srvAtomicSeq.incrementAndGet());
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
assertEquals(2L, clientAtomicSeq.incrementAndGet());
@@ -118,8 +78,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
clientAtomicSeq.batchSize(1);
@@ -132,47 +90,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertEquals(1001L, srvAtomicSeq.incrementAndGet());
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvAtomicSeq.close();
- return true;
+ assert srvAtomicSeq.removed();
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- srvAtomicSeq.close();
-
- assert srvAtomicSeq.removed();
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
-
- U.sleep(1000);
+ });
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -194,8 +118,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
BlockTpcCommunicationSpi commSpi = commSpi(srv);
final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqInProg", 0, true);
@@ -228,45 +150,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
commSpi.unblockMsg();
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ // Check that future failed.
+ assertNotNull(fut.error());
+ assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- // Check that future failed.
- assertNotNull(fut.error());
- assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
// Check that after reconnect working.
assert clientAtomicSeq.incrementAndGet() >= 0;
@@ -285,59 +175,25 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRef", "1st value", true);
assertEquals("1st value", clientAtomicRef.get());
assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value"));
assertEquals("2st value", clientAtomicRef.get());
- IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRef", "1st value", false);
+ final IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRef", "1st value", false);
assertEquals("2st value", srvAtomicRef.get());
assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value"));
assertEquals("3st value", srvAtomicRef.get());
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertEquals("3st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("3st value", "4st value"));
+ assertEquals("4st value", srvAtomicRef.get());
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- assertEquals("3st value", srvAtomicRef.get());
- assertTrue(srvAtomicRef.compareAndSet("3st value", "4st value"));
- assertEquals("4st value", srvAtomicRef.get());
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
assertEquals("4st value", clientAtomicRef.get());
assertTrue(clientAtomicRef.compareAndSet("4st value", "5st value"));
@@ -360,8 +216,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
final IgniteAtomicReference<String> clientAtomicRef =
client.atomicReference("atomicRefRemoved", "1st value", true);
@@ -369,49 +223,17 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value"));
assertEquals("2st value", clientAtomicRef.get());
- IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false);
+ final IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false);
assertEquals("2st value", srvAtomicRef.get());
assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value"));
assertEquals("3st value", srvAtomicRef.get());
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvAtomicRef.close();
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- srvAtomicRef.close();
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -432,8 +254,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
final IgniteAtomicReference<String> clientAtomicRef =
client.atomicReference("atomicRefInProg", "1st value", true);
@@ -468,45 +288,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
servCommSpi.unblockMsg();
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ // Check that future failed.
+ assertNotNull(fut.error());
+ assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- // Check that future failed.
- assertNotNull(fut.error());
- assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
// Check that after reconnect working.
assertEquals("3st value", clientAtomicRef.get());
@@ -530,59 +318,25 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStamped", 0, 0, true);
assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
assertEquals(1, clientAtomicStamped.value());
assertEquals(1, clientAtomicStamped.stamp());
- IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStamped", 0, 0, false);
+ final IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStamped", 0, 0, false);
assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2));
assertEquals(2, srvAtomicStamped.value());
assertEquals(2, srvAtomicStamped.stamp());
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertEquals(true, srvAtomicStamped.compareAndSet(2, 3, 2, 3));
+ assertEquals(3, srvAtomicStamped.value());
+ assertEquals(3, srvAtomicStamped.stamp());
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- assertEquals(true, srvAtomicStamped.compareAndSet(2, 3, 2, 3));
- assertEquals(3, srvAtomicStamped.value());
- assertEquals(3, srvAtomicStamped.stamp());
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
assertEquals(true, clientAtomicStamped.compareAndSet(3, 4, 3, 4));
assertEquals(4, clientAtomicStamped.value());
@@ -605,57 +359,23 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true);
assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
assertEquals(1, clientAtomicStamped.value());
assertEquals(1, clientAtomicStamped.stamp());
- IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false);
+ final IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false);
assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2));
assertEquals(2, srvAtomicStamped.value());
assertEquals(2, srvAtomicStamped.stamp());
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvAtomicStamped.close();
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- srvAtomicStamped.close();
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -676,8 +396,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedInProgress", 0, 0, true);
assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
@@ -711,45 +429,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
servCommSpi.unblockMsg();
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ // Check that future failed.
+ assertNotNull(fut.error());
+ assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- // Check that future failed.
- assertNotNull(fut.error());
- assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
// Check that after reconnect working.
assertEquals(true, clientAtomicStamped.compareAndSet(2, 3, 2, 3));
@@ -773,53 +459,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true);
assertEquals(0L, clientAtomicLong.getAndAdd(1));
- IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false);
+ final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false);
assertEquals(1L, srvAtomicLong.getAndAdd(1));
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertEquals(2L, srvAtomicLong.getAndAdd(1));
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- assertEquals(2L, srvAtomicLong.getAndAdd(1));
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
assertEquals(3L, clientAtomicLong.getAndAdd(1));
@@ -838,53 +490,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
assertEquals(0L, clientAtomicLong.getAndAdd(1));
- IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false);
+ final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false);
assertEquals(1L, srvAtomicLong.getAndAdd(1));
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvAtomicLong.close();
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- srvAtomicLong.close();
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -905,8 +523,6 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
BlockTpcCommunicationSpi commSpi = commSpi(srv);
final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongInProggress", 0, true);
@@ -932,45 +548,13 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
commSpi.unblockMsg();
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ // Check that future failed.
+ assertNotNull(fut.error());
+ assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- // Check that future failed.
- assertNotNull(fut.error());
- assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
// Check that after reconnect working.
assertEquals(1, clientAtomicLong.addAndGet(1));
@@ -989,53 +573,19 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true);
assertEquals(3, clientLatch.count());
- IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false);
+ final IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false);
assertEquals(3, srvLatch.count());
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override
- public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvLatch.countDown();
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- srvLatch.countDown();
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ });
assertEquals(2, srvLatch.count());
assertEquals(2, clientLatch.count());
@@ -1053,70 +603,4 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(srvLatch.await(1000));
assertTrue(clientLatch.await(1000));
}
-
- /**
- * @throws Exception If failed.
- */
- public void _testLatchReconnect2() throws Exception {
- Ignite client = grid(serverCount());
-
- assertTrue(client.cluster().localNode().isClient());
-
- Ignite srv = clientRouter(client);
-
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
- final IgniteCountDownLatch clientLatch = client.countDownLatch("latch2", 1, false, true);
-
- IgniteCountDownLatch srvLatch = srv.countDownLatch("latch2", 1, false, false);
-
- assertFalse(clientLatch.await(100));
-
- IgniteInternalFuture<Boolean> waitFut = GridTestUtils.runAsync(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return clientLatch.await(60_000, MILLISECONDS);
- }
- });
-
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
-
- final TestTcpDiscoverySpi clientSpi = spi(client);
-
- log.info("Block reconnect.");
-
- clientSpi.writeLatch = new CountDownLatch(1);
-
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- disconnectLatch.countDown();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
-
- reconnectLatch.countDown();
- }
-
- return true;
- }
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
-
- srvLatch.countDown();
-
- assertNotDone(waitFut);
-
- log.info("Allow reconnect.");
-
- clientSpi.writeLatch.countDown();
-
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
-
- assertTrue(waitFut.get(5000));
- }
}