You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/03 16:24:49 UTC
incubator-ignite git commit: IGNITE-901 Added test for collections.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-901 b14b73e85 -> 4b90d91ab
IGNITE-901 Added test for collections.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4b90d91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4b90d91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4b90d91a
Branch: refs/heads/ignite-901
Commit: 4b90d91ab95099efa75dd43160c0038432af95c3
Parents: b14b73e
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Jul 3 17:24:55 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 3 17:24:55 2015 +0300
----------------------------------------------------------------------
.../IgniteClientReconnectAtomicsTest.java | 6 +-
.../IgniteClientReconnectCollectionsTest.java | 350 +++++++++++++++++--
2 files changed, 319 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b90d91a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index a827671..e629d0a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.testframework.*;
import java.util.concurrent.*;
/**
- * TODO IGNITE-901: test AtomicReference, AtomicStamped, usage after remove, test API block, fail current call on disconnect.
+ *
*/
public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstractTest {
/** {@inheritDoc} */
@@ -128,7 +128,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
srvAtomicSeq.batchSize(1);
- commSpi.msgClass = GridNearLockResponse.class;
+ commSpi.blockMsg(GridNearLockResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -529,7 +529,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongInProggress", 0, false);
- commSpi.msgClass = GridNearLockResponse.class;
+ commSpi.blockMsg(GridNearLockResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b90d91a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index fcb74cd..54e1329 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -19,18 +19,17 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.testframework.*;
import java.util.concurrent.*;
-import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.cache.CacheAtomicityMode.*;
import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.events.EventType.*;
/**
- * TODO IGNITE-901: test for queue, check set/queue usage after remove, test API block, fail current call on disconnect.
+ *
*/
public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest {
/** {@inheritDoc} */
@@ -46,6 +45,63 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
/**
* @throws Exception If failed.
*/
+ public void testQueueReconnect() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ queueReconnect(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ queueReconnect(colCfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueReconnectRemoved() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ queueReconnectRemoved(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ queueReconnectRemoved(colCfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueReconnectInProg() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ queueReconnectInProg(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ queueReconnectInProg(colCfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testSetReconnect() throws Exception {
CollectionConfiguration colCfg = new CollectionConfiguration();
@@ -63,6 +119,44 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
}
/**
+ * @throws Exception If failed.
+ */
+ public void testSetReconnectRemove() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ setReconnectRemove(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ setReconnectRemove(colCfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSetReconnectInProg() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ setReconnectInProg(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ setReconnectInProg(colCfg);
+ }
+
+ /**
* @param colCfg Collection configuration.
* @throws Exception If failed.
*/
@@ -73,60 +167,248 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
final String setName = "set-" + colCfg.getAtomicityMode();
IgniteSet<String> clientSet = client.set(setName, colCfg);
- IgniteSet<String> srvSet = srv.set(setName, null);
+ final IgniteSet<String> srvSet = srv.set(setName, null);
+
+ assertTrue(clientSet.add("1"));
+
+ assertFalse(srvSet.add("1"));
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertTrue(srvSet.add("2"));
+ }
+ });
+
+ assertFalse(clientSet.add("2"));
+
+ assertTrue(clientSet.remove("2"));
+
+ assertFalse(srvSet.contains("2"));
+ }
+
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void setReconnectRemove(CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(client);
+
+ final String setName = "set-rm-" + colCfg.getAtomicityMode();
+
+ final IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+ final IgniteSet<String> srvSet = srv.set(setName, null);
assertTrue(clientSet.add("1"));
assertFalse(srvSet.add("1"));
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvSet.close();
+ }
+ });
- final TestTcpDiscoverySpi clientSpi = spi(client);
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientSet.add("fail");
- log.info("Block reconnect.");
+ return null;
+ }
+ }, IllegalStateException.class, null);
+ }
- clientSpi.writeLatch = new CountDownLatch(1);
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void setReconnectInProg(final CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
- client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
+ final Ignite srv = clientRouter(client);
- disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
+ final String setName = "set-in-progress-" + colCfg.getAtomicityMode();
+
+ final IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+ final IgniteSet<String> srvSet = srv.set(setName, null);
+
+ assertTrue(clientSet.add("1"));
+
+ assertFalse(srvSet.add("1"));
- reconnectLatch.countDown();
- }
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
- return true;
+ if (colCfg.getAtomicityMode() == ATOMIC)
+ commSpi.blockMsg(GridNearAtomicUpdateResponse.class);
+ else
+ commSpi.blockMsg(GridNearTxPrepareResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientSet.add("2");
}
- }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+ });
- srvSpi.failNode(client.cluster().localNode().id(), null);
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+ assertNotDone(fut);
- assertTrue(srvSet.add("2"));
+ commSpi.unblockMsg();
- log.info("Allow reconnect.");
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ // Check that future failed.
+ assertNotNull("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", fut.error());
+ assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
+ }
+ });
- clientSpi.writeLatch.countDown();
+ assertTrue(clientSet.add("3"));
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ assertFalse(srvSet.add("3"));
- assertFalse(clientSet.add("2"));
+ srvSet.close();
+ }
- assertTrue(clientSet.remove("2"));
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void queueReconnect(CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
- assertFalse(srvSet.contains("2"));
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final String setName = "queue-" + colCfg.getAtomicityMode();
+
+ IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+ final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+ assertTrue(clientQueue.offer("1"));
+
+ assertTrue(srvQueue.contains("1"));
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertTrue(srvQueue.add("2"));
+ }
+ });
+
+ assertTrue(clientQueue.contains("2"));
+
+ assertEquals("1", clientQueue.poll());
+ }
+
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void queueReconnectRemoved(CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final String setName = "queue-rmv" + colCfg.getAtomicityMode();
+
+ final IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+ final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+ assertTrue(clientQueue.add("1"));
+
+ assertTrue(srvQueue.add("2"));
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvQueue.close();
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientQueue.add("fail");
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+ }
+
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void queueReconnectInProg(final CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final String setName = "queue-rmv" + colCfg.getAtomicityMode();
+
+ final IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+ final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+ assertTrue(clientQueue.offer("1"));
+
+ assertTrue(srvQueue.contains("1"));
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ if (colCfg.getAtomicityMode() == ATOMIC)
+ commSpi.blockMsg(GridNearAtomicUpdateResponse.class);
+ else
+ commSpi.blockMsg(GridNearTxPrepareResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientQueue.add("2");
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMsg();
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ // Check that future failed.
+ assertNotNull("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", fut.error());
+ assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
+ }
+ });
+
+ assertTrue(clientQueue.add("3"));
+
+ assertEquals("1", clientQueue.poll());
}
}