You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/03 13:03:09 UTC
incubator-ignite git commit: IGNITE-901 Added tests for Sequence.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-901 c221e773a -> 97a62245f
IGNITE-901 Added tests for Sequence.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/97a62245
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/97a62245
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/97a62245
Branch: refs/heads/ignite-901
Commit: 97a62245f17af1f83070e1390f4af8525d698960
Parents: c221e77
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Jul 3 14:03:21 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 3 14:03:21 2015 +0300
----------------------------------------------------------------------
.../IgniteClientReconnectAtomicsTest.java | 232 +++++++++++++++++++
1 file changed, 232 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a62245/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 1a5b795..d7f6170 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -46,6 +46,238 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
/**
* @throws Exception If failed.
*/
+ public void testAtomicSeqReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true);
+
+ assertEquals(1L, clientAtomicSeq.incrementAndGet());
+
+ IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false);
+
+ assertEquals(1001L, srvAtomicSeq.incrementAndGet());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ assertEquals(1002L, srvAtomicSeq.incrementAndGet());
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ assertEquals(2L, clientAtomicSeq.incrementAndGet());
+
+ assertEquals(1003L, srvAtomicSeq.incrementAndGet());
+
+ assertEquals(3L, clientAtomicSeq.incrementAndGet());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicSeqReconnectRemoved() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
+
+ clientAtomicSeq.batchSize(1);
+
+ assertEquals(1L, clientAtomicSeq.incrementAndGet());
+
+ final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeqRmv", 0, false);
+
+ srvAtomicSeq.batchSize(1);
+
+ assertEquals(1001L, srvAtomicSeq.incrementAndGet());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ srvAtomicSeq.close();
+
+ assert srvAtomicSeq.removed();
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ U.sleep(1000);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < 2000; i++)
+ clientAtomicSeq.incrementAndGet();
+
+ return null;
+ }
+ }, IgniteException.class, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicSeqReconnectInProgress() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqInProg", 0, true);
+
+ clientAtomicSeq.batchSize(1);
+
+ final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeqInProg", 0, false);
+
+ srvAtomicSeq.batchSize(1);
+
+ commSpi.msgClass = GridNearLockResponse.class;
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < 3000; i++)
+ clientAtomicSeq.incrementAndGet();
+
+ return null;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMsg();
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ // Check that future failed.
+ assertNotNull(fut.error());
+ assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ // Check that after reconnect working.
+ assert clientAtomicSeq.incrementAndGet() >= 0;
+ assert srvAtomicSeq.incrementAndGet() >= 0;
+
+ clientAtomicSeq.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAtomicReferenceReconnect() throws Exception {
Ignite client = grid(serverCount());