You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/03 13:03:09 UTC

incubator-ignite git commit: IGNITE-901 Added tests for Sequence.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 c221e773a -> 97a62245f


IGNITE-901 Added tests for Sequence.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/97a62245
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/97a62245
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/97a62245

Branch: refs/heads/ignite-901
Commit: 97a62245f17af1f83070e1390f4af8525d698960
Parents: c221e77
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Jul 3 14:03:21 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 3 14:03:21 2015 +0300

----------------------------------------------------------------------
 .../IgniteClientReconnectAtomicsTest.java       | 232 +++++++++++++++++++
 1 file changed, 232 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a62245/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 1a5b795..d7f6170 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -46,6 +46,238 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicSeqReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true);
+
+        assertEquals(1L, clientAtomicSeq.incrementAndGet());
+
+        IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false);
+
+        assertEquals(1001L, srvAtomicSeq.incrementAndGet());
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+        assertEquals(1002L, srvAtomicSeq.incrementAndGet());
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        assertEquals(2L, clientAtomicSeq.incrementAndGet());
+
+        assertEquals(1003L, srvAtomicSeq.incrementAndGet());
+
+        assertEquals(3L, clientAtomicSeq.incrementAndGet());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicSeqReconnectRemoved() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
+
+        clientAtomicSeq.batchSize(1);
+
+        assertEquals(1L, clientAtomicSeq.incrementAndGet());
+
+        final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeqRmv", 0, false);
+
+        srvAtomicSeq.batchSize(1);
+
+        assertEquals(1001L, srvAtomicSeq.incrementAndGet());
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+        srvAtomicSeq.close();
+
+        assert srvAtomicSeq.removed();
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        U.sleep(1000);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < 2000; i++)
+                    clientAtomicSeq.incrementAndGet();
+
+                return null;
+            }
+        }, IgniteException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicSeqReconnectInProgress() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqInProg", 0, true);
+
+        clientAtomicSeq.batchSize(1);
+
+        final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeqInProg", 0, false);
+
+        srvAtomicSeq.batchSize(1);
+
+        commSpi.msgClass = GridNearLockResponse.class;
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < 3000; i++)
+                    clientAtomicSeq.incrementAndGet();
+
+                return null;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMsg();
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+        // Check that future failed.
+        assertNotNull(fut.error());
+        assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        // Check that after reconnect working.
+        assert clientAtomicSeq.incrementAndGet() >= 0;
+        assert srvAtomicSeq.incrementAndGet() >= 0;
+
+        clientAtomicSeq.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicReferenceReconnect() throws Exception {
         Ignite client = grid(serverCount());