You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/03 16:24:49 UTC

incubator-ignite git commit: IGNITE-901 Added test for collections.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 b14b73e85 -> 4b90d91ab


IGNITE-901 Added test for collections.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4b90d91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4b90d91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4b90d91a

Branch: refs/heads/ignite-901
Commit: 4b90d91ab95099efa75dd43160c0038432af95c3
Parents: b14b73e
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Jul 3 17:24:55 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 3 17:24:55 2015 +0300

----------------------------------------------------------------------
 .../IgniteClientReconnectAtomicsTest.java       |   6 +-
 .../IgniteClientReconnectCollectionsTest.java   | 350 +++++++++++++++++--
 2 files changed, 319 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b90d91a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index a827671..e629d0a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.testframework.*;
 import java.util.concurrent.*;
 
 /**
- * TODO IGNITE-901: test AtomicReference, AtomicStamped, usage after remove, test API block, fail current call on disconnect.
+ *
  */
 public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstractTest {
     /** {@inheritDoc} */
@@ -128,7 +128,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         srvAtomicSeq.batchSize(1);
 
-        commSpi.msgClass = GridNearLockResponse.class;
+        commSpi.blockMsg(GridNearLockResponse.class);
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
@@ -529,7 +529,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongInProggress", 0, false);
 
-        commSpi.msgClass = GridNearLockResponse.class;
+        commSpi.blockMsg(GridNearLockResponse.class);
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b90d91a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index fcb74cd..54e1329 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -19,18 +19,17 @@ package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.testframework.*;
 
 import java.util.concurrent.*;
 
-import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.cache.CacheAtomicityMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.events.EventType.*;
 
 /**
- * TODO IGNITE-901: test for queue, check set/queue usage after remove, test API block, fail current call on disconnect.
+ *
  */
 public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest {
     /** {@inheritDoc} */
@@ -46,6 +45,63 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
     /**
      * @throws Exception If failed.
      */
+    public void testQueueReconnect() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        queueReconnect(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        queueReconnect(colCfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueReconnectRemoved() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        queueReconnectRemoved(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        queueReconnectRemoved(colCfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueReconnectInProg() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        queueReconnectInProg(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        queueReconnectInProg(colCfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSetReconnect() throws Exception {
         CollectionConfiguration colCfg = new CollectionConfiguration();
 
@@ -63,6 +119,44 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testSetReconnectRemove() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        setReconnectRemove(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        setReconnectRemove(colCfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSetReconnectInProg() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        setReconnectInProg(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        setReconnectInProg(colCfg);
+    }
+
+    /**
      * @param colCfg Collection configuration.
      * @throws Exception If failed.
      */
@@ -73,60 +167,248 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
         final String setName = "set-" + colCfg.getAtomicityMode();
 
         IgniteSet<String> clientSet = client.set(setName, colCfg);
 
-        IgniteSet<String> srvSet = srv.set(setName, null);
+        final IgniteSet<String> srvSet = srv.set(setName, null);
+
+        assertTrue(clientSet.add("1"));
+
+        assertFalse(srvSet.add("1"));
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                assertTrue(srvSet.add("2"));
+            }
+        });
+
+        assertFalse(clientSet.add("2"));
+
+        assertTrue(clientSet.remove("2"));
+
+        assertFalse(srvSet.contains("2"));
+    }
+
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void setReconnectRemove(CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(client);
+
+        final String setName = "set-rm-" + colCfg.getAtomicityMode();
+
+        final IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+        final IgniteSet<String> srvSet = srv.set(setName, null);
 
         assertTrue(clientSet.add("1"));
 
         assertFalse(srvSet.add("1"));
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvSet.close();
+            }
+        });
 
-        final TestTcpDiscoverySpi clientSpi = spi(client);
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                clientSet.add("fail");
 
-        log.info("Block reconnect.");
+                return null;
+            }
+        }, IllegalStateException.class, null);
+    }
 
-        clientSpi.writeLatch = new CountDownLatch(1);
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void setReconnectInProg(final CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
 
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
+        final Ignite srv = clientRouter(client);
 
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
+        final String setName = "set-in-progress-" + colCfg.getAtomicityMode();
+
+        final IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+        final IgniteSet<String> srvSet = srv.set(setName, null);
+
+        assertTrue(clientSet.add("1"));
+
+        assertFalse(srvSet.add("1"));
 
-                    reconnectLatch.countDown();
-                }
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
 
-                return true;
+        if (colCfg.getAtomicityMode() == ATOMIC)
+            commSpi.blockMsg(GridNearAtomicUpdateResponse.class);
+        else
+            commSpi.blockMsg(GridNearTxPrepareResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return clientSet.add("2");
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+        });
 
-        srvSpi.failNode(client.cluster().localNode().id(), null);
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
 
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+        assertNotDone(fut);
 
-        assertTrue(srvSet.add("2"));
+        commSpi.unblockMsg();
 
-        log.info("Allow reconnect.");
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                // Check that future failed.
+                assertNotNull("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", fut.error());
+                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
+            }
+        });
 
-        clientSpi.writeLatch.countDown();
+        assertTrue(clientSet.add("3"));
 
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        assertFalse(srvSet.add("3"));
 
-        assertFalse(clientSet.add("2"));
+        srvSet.close();
+    }
 
-        assertTrue(clientSet.remove("2"));
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void queueReconnect(CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
 
-        assertFalse(srvSet.contains("2"));
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final String setName = "queue-" + colCfg.getAtomicityMode();
+
+        IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+        final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+        assertTrue(clientQueue.offer("1"));
+
+        assertTrue(srvQueue.contains("1"));
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                assertTrue(srvQueue.add("2"));
+            }
+        });
+
+        assertTrue(clientQueue.contains("2"));
+
+        assertEquals("1", clientQueue.poll());
+    }
+
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void queueReconnectRemoved(CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final String setName = "queue-rmv" + colCfg.getAtomicityMode();
+
+        final IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+        final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+        assertTrue(clientQueue.add("1"));
+
+        assertTrue(srvQueue.add("2"));
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvQueue.close();
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                clientQueue.add("fail");
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+    }
+
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void queueReconnectInProg(final CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final String setName = "queue-rmv" + colCfg.getAtomicityMode();
+
+        final IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+        final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+        assertTrue(clientQueue.offer("1"));
+
+        assertTrue(srvQueue.contains("1"));
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        if (colCfg.getAtomicityMode() == ATOMIC)
+            commSpi.blockMsg(GridNearAtomicUpdateResponse.class);
+        else
+            commSpi.blockMsg(GridNearTxPrepareResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return clientQueue.add("2");
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMsg();
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                // Check that future failed.
+                assertNotNull("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", fut.error());
+                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
+            }
+        });
+
+        assertTrue(clientQueue.add("3"));
+
+        assertEquals("1", clientQueue.poll());
     }
 }