You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/03 12:28:58 UTC
incubator-ignite git commit: IGNITE-901 Added tests.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-901 807ceb380 -> 669ab1371
IGNITE-901 Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/669ab137
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/669ab137
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/669ab137
Branch: refs/heads/ignite-901
Commit: 669ab1371f5dc211cb72ff596b7dc59a49f3dd9d
Parents: 807ceb3
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Jul 3 13:29:11 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 3 13:29:11 2015 +0300
----------------------------------------------------------------------
.../IgniteClientReconnectAbstractTest.java | 81 +++
.../IgniteClientReconnectApiBlockTest.java | 533 +++++++++++++++--
.../IgniteClientReconnectAtomicsTest.java | 579 ++++++++++++++++++-
.../IgniteClientReconnectFailoverSelfTest.java | 290 ++++++++++
.../IgniteClientReconnectTestSuite.java | 1 +
5 files changed, 1435 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/669ab137/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 23b8a15..937104f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -18,19 +18,26 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.internal.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.spi.discovery.tcp.messages.*;
import org.apache.ignite.testframework.junits.common.*;
+import org.eclipse.jetty.util.*;
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
/**
*
@@ -53,6 +60,10 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
cfg.setDiscoverySpi(disco);
+ BlockTpcCommunicationSpi commSpi = new BlockTpcCommunicationSpi(log);
+
+ cfg.setCommunicationSpi(commSpi);
+
if (clientMode)
cfg.setClientMode(true);
@@ -79,6 +90,14 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
}
+ /**
+ * @param ignite Node.
+ * @return Communication SPI.
+ */
+ protected BlockTpcCommunicationSpi commSpi(Ignite ignite) {
+ return ((BlockTpcCommunicationSpi)ignite.configuration().getCommunicationSpi());
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
@@ -157,4 +176,66 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
super.writeToSocket(sock, msg);
}
}
+
+ /**
+ *
+ */
+ protected static class BlockTpcCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ volatile Class msgClass;
+
+ AtomicBoolean collectStart = new AtomicBoolean(false);
+
+ ConcurrentHashSet<String> classes = new ConcurrentHashSet<>();
+
+ /** */
+ protected IgniteLogger log;
+
+ /**
+ * @param log Logger.
+ */
+ public BlockTpcCommunicationSpi(IgniteLogger log) {
+ this.log = log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ Class msgClass0 = msgClass;
+
+ if (collectStart.get() && msg instanceof GridIoMessage)
+ classes.add(((GridIoMessage)msg).message().getClass().getName());
+
+ if (msgClass0 != null && msg instanceof GridIoMessage
+ && ((GridIoMessage)msg).message().getClass().equals(msgClass)) {
+ log.info("Block message: " + msg);
+
+ return;
+ }
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * @param clazz Class of messages which will be block.
+ */
+ public void blockMsg(Class clazz) {
+ msgClass = clazz;
+ }
+
+ /**
+ * Unlock all message.
+ */
+ public void unblockMsg() {
+ msgClass = null;
+ }
+
+ public void start() {
+ collectStart.set(true);
+ }
+
+ public void print() {
+ for (String aClass : classes)
+ log.error(aClass);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/669ab137/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
index 164f6c8..f9522a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
@@ -18,12 +18,16 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
import org.apache.ignite.testframework.*;
+import javax.cache.processor.*;
import java.util.*;
import java.util.concurrent.*;
@@ -50,12 +54,453 @@ public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbst
/**
* @throws Exception If failed.
*/
- @SuppressWarnings("unchecked")
public void testIgniteBlockOnDisconnect() throws Exception {
+ // Check cache operations.
+ cacheOperationsTest();
+
+ // Check cache operations.
+ beforeTestsStarted();
+ dataStructureOperationsTest();
+
+ // Check ignite operations.
+ beforeTestsStarted();
+ igniteOperationsTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void dataStructureOperationsTest() throws Exception {
clientMode = true;
final Ignite client = startGrid(serverCount());
+ doTestIgniteOperationOnDisconnect(client, Arrays.asList(
+ // Check atomic long.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.atomicLong("testAtomic", 41, true);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteAtomicLong atomicLong = (IgniteAtomicLong)o;
+
+ assertEquals(42, atomicLong.incrementAndGet());
+
+ return true;
+ }
+ }
+ ),
+ // Check set.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.set("testSet", new CollectionConfiguration());
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteSet set = (IgniteSet)o;
+
+ String val = "testVal";
+
+ set.add(val);
+
+ assertEquals(1, set.size());
+ assertTrue(set.contains(val));
+
+ return true;
+ }
+ }
+ ),
+ // Check ignite queue.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.queue("TestQueue", 10, new CollectionConfiguration());
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteQueue queue = (IgniteQueue)o;
+
+ String val = "Test";
+
+ queue.add(val);
+
+ assertEquals(val, queue.poll());
+
+ return true;
+ }
+ }
+ )
+ ));
+
+ clientMode = false;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void cacheOperationsTest() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ final IgniteCache<Object, Object> defaultCache = client.cache(null);
+
+ assertNotNull(defaultCache);
+
+ doTestIgniteOperationOnDisconnect(client, Arrays.asList(
+ // Check put and get operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return defaultCache.getAndPut(9999, 9999);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNull(o);
+
+ assertEquals(9999, defaultCache.get(9999));
+
+ return true;
+ }
+ }
+ ),
+ // Check put operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ defaultCache.put(10000, 10000);
+
+ return true;
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertTrue((Boolean)o);
+
+ assertEquals(10000, defaultCache.get(10000));
+
+ return true;
+ }
+ }
+ ),
+ // Check get operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return defaultCache.get(10001);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNull(o);
+
+ return true;
+ }
+ }
+ ),
+ // Check invoke operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return defaultCache.invoke(10000, new CacheEntryProcessor<Object, Object, Object>() {
+ @Override public Object process(MutableEntry<Object, Object> entry,
+ Object... arguments) throws EntryProcessorException {
+ assertTrue(entry.exists());
+
+ return (int)entry.getValue() * 2;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ assertEquals(20000, (int)o);
+
+ return true;
+ }
+ }
+ ),
+ // Check put async operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ IgniteCache<Object, Object> async = defaultCache.withAsync();
+
+ async.put(10002, 10002);
+
+ return async.future().get();
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNull(o);
+
+ assertEquals(10002, defaultCache.get(10002));
+
+ return true;
+ }
+ }
+ ),
+ // Check transaction.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.transactions();
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteTransactions txs = (IgniteTransactions)o;
+
+ assertNotNull(txs);
+
+ return true;
+ }
+ }
+ ),
+ // Check get cache.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.cache(null);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)o;
+
+ assertNotNull(cache0);
+
+ cache0.put(1, 1);
+
+ assertEquals(1, cache0.get(1));
+
+ return true;
+ }
+ }
+ ),
+ // Check streamer.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.dataStreamer(null);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)o;
+
+ streamer.addData(2, 2);
+
+ streamer.close();
+
+ assertEquals(2, client.cache(null).get(2));
+
+ return true;
+ }
+ }
+ ),
+ // Check create cache.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.createCache("test_cache");
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteCache<Object, Object> cache = (IgniteCache<Object, Object>)o;
+
+ assertNotNull(cache);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+
+ return true;
+ }
+ }
+ )
+
+ ));
+
+ clientMode = false;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void igniteOperationsTest() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ final IgniteCache<Object, Object> defaultCache = client.cache(null);
+
+ final CountDownLatch recvLatch = new CountDownLatch(1);
+
+ assertNotNull(defaultCache);
+
+ doTestIgniteOperationOnDisconnect(client, Arrays.asList(
+ // Check compute.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.compute();
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteCompute comp = (IgniteCompute)o;
+
+ Collection<UUID> uuids = comp.broadcast(new IgniteCallable<UUID>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public UUID call() throws Exception {
+ return ignite.cluster().localNode().id();
+ }
+ });
+
+ for (UUID uuid : uuids)
+ assertNotNull(uuid);
+
+ return true;
+ }
+ }
+ ),
+
+ // Check ping node.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.cluster().pingNode(new UUID(0, 0));
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ Boolean pingNode = (Boolean)o;
+
+ assertFalse(pingNode);
+
+ return true;
+ }
+ }
+ ),
+ // Check register remote listener.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.events().remoteListen(null, new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event event) {
+ return true;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ UUID remoteId = (UUID)o;
+
+ assertNotNull(remoteId);
+
+ client.events().stopRemoteListen(remoteId);
+
+ return true;
+ }
+ }
+ ),
+ // Check message operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() {
+ @Override public boolean apply(UUID uuid, Object o) {
+ if (o.equals("Test message."))
+ recvLatch.countDown();
+
+ return true;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteMessaging msg = client.message();
+
+ msg.send(null, "Test message.");
+
+ try {
+ assert recvLatch.await(2, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e) {
+ fail("Message wasn't received.");
+ }
+
+ return true;
+ }
+ }
+ ),
+ // Check executor.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ return client.executorService().submit(new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ return 42;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ Future<Integer> fut = (Future<Integer>)o;
+
+ try {
+ assertEquals(42, (int)fut.get());
+ }
+ catch (Exception e) {
+ fail("Failed submit task.");
+ }
+
+ return true;
+ }
+ }
+ )
+ ));
+
+ clientMode = false;
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void doTestIgniteOperationOnDisconnect(Ignite client, final List<T2<Callable, C1<Object, Boolean>>> ops)
+ throws Exception {
assertNotNull(client.cache(null));
final TestTcpDiscoverySpi clientSpi = spi(client);
@@ -74,8 +519,6 @@ public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbst
final List<IgniteInternalFuture> futs = new ArrayList<>();
- // TODO IGNITE-901 test block for others public API.
-
client.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
@@ -83,26 +526,12 @@ public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbst
assertEquals(1, reconnectLatch.getCount());
- futs.add(GridTestUtils.runAsync(new Callable() {
- @Override public Object call() throws Exception {
- return client.transactions();
- }
- }));
-
- futs.add(GridTestUtils.runAsync(new Callable() {
- @Override public Object call() throws Exception {
- return client.cache(null);
- }
- }));
-
- futs.add(GridTestUtils.runAsync(new Callable() {
- @Override public Object call() throws Exception {
- return client.dataStreamer(null);
- }
- }));
+ for (T2<Callable, C1<Object, Boolean>> op : ops)
+ futs.add(GridTestUtils.runAsync(op.get1()));
disconnectLatch.countDown();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
info("Reconnected: " + evt);
reconnectLatch.countDown();
@@ -112,46 +541,56 @@ public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbst
}
}, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
- log.info("Fail client.");
-
- srvSpi.failNode(client.cluster().localNode().id(), null);
-
- assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+ try {
+ log.info("Fail client.");
- assertEquals(3, futs.size());
+ srvSpi.failNode(client.cluster().localNode().id(), null);
- for (IgniteInternalFuture<?> fut : futs)
- assertNotDone(fut);
+ assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS));
- U.sleep(2000);
+ assertEquals(ops.size(), futs.size());
- for (IgniteInternalFuture<?> fut : futs)
- assertNotDone(fut);
+ for (IgniteInternalFuture<?> fut : futs)
+ assertNotDone(fut);
- log.info("Allow reconnect.");
+ U.sleep(2000);
- clientSpi.writeLatch.countDown();
+ for (IgniteInternalFuture<?> fut : futs)
+ assertNotDone(fut);
- assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+ log.info("Allow reconnect.");
- IgniteTransactions txs = (IgniteTransactions)futs.get(0).get();
+ clientSpi.writeLatch.countDown();
- assertNotNull(txs);
+ assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
- IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)futs.get(1).get();
+ // Check operation after reconnect working.
+ for (int i = 0; i < futs.size(); i++) {
+ final int i0 = i;
- assertNotNull(cache0);
+ try {
+ final Object furRes = futs.get(i0).get(2, TimeUnit.SECONDS);
- cache0.put(1, 1);
-
- assertEquals(1, cache0.get(1));
-
- IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)futs.get(2).get();
+ assertTrue(GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ return ops.get(i0).get2().apply(furRes);
+ }
+ }).get(2, TimeUnit.SECONDS));
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ e.printStackTrace();
- streamer.addData(2, 2);
+ fail("Operation timeout. Iteration: " + i + ".");
+ }
+ }
+ }
+ finally {
+ clientSpi.writeLatch.countDown();
- streamer.close();
+ for (IgniteInternalFuture fut : futs)
+ fut.cancel();
- assertEquals(2, cache0.get(2));
+ stopAllGrids();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/669ab137/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index bbb7eef..1a5b795 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.events.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.testframework.*;
@@ -44,6 +46,494 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
/**
* @throws Exception If failed.
*/
+ public void testAtomicReferenceReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRef", "1st value", true);
+
+ assertEquals("1st value", clientAtomicRef.get());
+ assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value"));
+ assertEquals("2st value", clientAtomicRef.get());
+
+ IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRef", "1st value", false);
+
+ assertEquals("2st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value"));
+ assertEquals("3st value", srvAtomicRef.get());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ assertEquals("3st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("3st value", "4st value"));
+ assertEquals("4st value", srvAtomicRef.get());
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ assertEquals("4st value", clientAtomicRef.get());
+ assertTrue(clientAtomicRef.compareAndSet("4st value", "5st value"));
+ assertEquals("5st value", clientAtomicRef.get());
+
+ assertEquals("5st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("5st value", "6st value"));
+ assertEquals("6st value", srvAtomicRef.get());
+
+ srvAtomicRef.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReferenceReconnectRemoved() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteAtomicReference<String> clientAtomicRef =
+ client.atomicReference("atomicRefRemoved", "1st value", true);
+
+ assertEquals("1st value", clientAtomicRef.get());
+ assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value"));
+ assertEquals("2st value", clientAtomicRef.get());
+
+ IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false);
+
+ assertEquals("2st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value"));
+ assertEquals("3st value", srvAtomicRef.get());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ srvAtomicRef.close();
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientAtomicRef.compareAndSet("3st value", "4st value");
+
+ return null;
+ }
+ }, IgniteException.class, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReferenceReconnectInProgress() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteAtomicReference<String> clientAtomicRef =
+ client.atomicReference("atomicRefInProg", "1st value", true);
+
+ assertEquals("1st value", clientAtomicRef.get());
+ assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value"));
+ assertEquals("2st value", clientAtomicRef.get());
+
+ IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefInProg", "1st value", false);
+
+ assertEquals("2st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value"));
+ assertEquals("3st value", srvAtomicRef.get());
+
+ BlockTpcCommunicationSpi servCommSpi = commSpi(srv);
+
+ servCommSpi.blockMsg(GridNearLockResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientAtomicRef.compareAndSet("3st value", "4st value");
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ servCommSpi.unblockMsg();
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ // Check that future failed.
+ assertNotNull(fut.error());
+ assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ // Check that after reconnect working.
+ assertEquals("3st value", clientAtomicRef.get());
+ assertTrue(clientAtomicRef.compareAndSet("3st value", "4st value"));
+ assertEquals("4st value", clientAtomicRef.get());
+
+ assertEquals("4st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("4st value", "5st value"));
+ assertEquals("5st value", srvAtomicRef.get());
+
+ srvAtomicRef.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicStampedReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStamped", 0, 0, true);
+
+ assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
+ assertEquals(1, clientAtomicStamped.value());
+ assertEquals(1, clientAtomicStamped.stamp());
+
+ IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStamped", 0, 0, false);
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2));
+ assertEquals(2, srvAtomicStamped.value());
+ assertEquals(2, srvAtomicStamped.stamp());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(2, 3, 2, 3));
+ assertEquals(3, srvAtomicStamped.value());
+ assertEquals(3, srvAtomicStamped.stamp());
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ assertEquals(true, clientAtomicStamped.compareAndSet(3, 4, 3, 4));
+ assertEquals(4, clientAtomicStamped.value());
+ assertEquals(4, clientAtomicStamped.stamp());
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(4, 5, 4, 5));
+ assertEquals(5, srvAtomicStamped.value());
+ assertEquals(5, srvAtomicStamped.stamp());
+
+ srvAtomicStamped.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicStampedReconnectRemoved() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true);
+
+ assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
+ assertEquals(1, clientAtomicStamped.value());
+ assertEquals(1, clientAtomicStamped.stamp());
+
+ IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false);
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2));
+ assertEquals(2, srvAtomicStamped.value());
+ assertEquals(2, srvAtomicStamped.stamp());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ srvAtomicStamped.close();
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientAtomicStamped.compareAndSet(2, 3, 2, 3);
+
+ return null;
+ }
+ }, IgniteException.class, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicStampedReconnectInProgress() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedInProgress", 0, 0, true);
+
+ assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
+ assertEquals(1, clientAtomicStamped.value());
+ assertEquals(1, clientAtomicStamped.stamp());
+
+ IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedInProgress", 0, 0, false);
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2));
+ assertEquals(2, srvAtomicStamped.value());
+ assertEquals(2, srvAtomicStamped.stamp());
+
+ BlockTpcCommunicationSpi servCommSpi = commSpi(srv);
+
+ servCommSpi.blockMsg(GridNearLockResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientAtomicStamped.compareAndSet(2, 3, 2, 3);
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ servCommSpi.unblockMsg();
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ // Check that future failed.
+ assertNotNull(fut.error());
+ assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ // Check that after reconnect working.
+ assertEquals(true, clientAtomicStamped.compareAndSet(2, 3, 2, 3));
+ assertEquals(3, clientAtomicStamped.value());
+ assertEquals(3, clientAtomicStamped.stamp());
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(3, 4, 3, 4));
+ assertEquals(4, srvAtomicStamped.value());
+ assertEquals(4, srvAtomicStamped.stamp());
+
+ srvAtomicStamped.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAtomicLongReconnect() throws Exception {
Ignite client = grid(serverCount());
@@ -141,7 +631,8 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
info("Disconnected: " + evt);
disconnectLatch.countDown();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
info("Reconnected: " + evt);
reconnectLatch.countDown();
@@ -175,7 +666,91 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
/**
* @throws Exception If failed.
*/
- public void testLatchReconnect1() throws Exception {
+ public void testAtomicLongReconnectInProgress() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongInProggress", 0, true);
+
+ final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongInProggress", 0, false);
+
+ commSpi.msgClass = GridNearLockResponse.class;
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientAtomicLong.getAndAdd(1);
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMsg();
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+ // Check that future failed.
+ assertNotNull(fut.error());
+ assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ // Check that after reconnect working.
+ assertEquals(1, clientAtomicLong.addAndGet(1));
+ assertEquals(2, srvAtomicLong.addAndGet(1));
+
+ clientAtomicLong.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLatchReconnect() throws Exception {
Ignite client = grid(serverCount());
assertTrue(client.cluster().localNode().isClient());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/669ab137/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java
new file mode 100644
index 0000000..f938733
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import javax.cache.processor.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectFailoverSelfTest extends IgniteClientReconnectAbstractTest {
+ /** */
+ public final Integer THREADS = 8;
+
+ /** */
+ public final Integer RESTART_CNT = 30;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /** */
+ private volatile CyclicBarrier barrier;
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperationReconnectApi() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ assertNotNull(client.cache(null));
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final AtomicBoolean stop = new AtomicBoolean(false);
+
+ final AtomicLong cntr = new AtomicLong();
+
+ final IgniteQueue<Object> queue = client.queue("test-queue", 1000, new CollectionConfiguration());
+
+ final IgniteAtomicLong atomicLong = client.atomicLong("counter", 0, true);
+
+ final IgniteAtomicSequence sequence = client.atomicSequence("sequence", 0, true);
+
+ final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ IgniteCache<Integer, Integer> cache = client.cache(null);
+
+ IgniteCompute compute = client.compute();
+
+ Set<Integer> keys = new TreeSet<>();
+ final Map<Integer, Integer> entries = new TreeMap<>();
+
+ for (int i = 0; i < 50; i++) {
+ keys.add(i);
+ entries.put(i, i);
+ }
+
+ while (!stop.get()) {
+ cntr.incrementAndGet();
+
+ try {
+ // Start cache operations.
+ for (int i = 0; i < 10; i++) {
+ cache.put(i, i);
+ cache.get(i);
+ cache.remove(i);
+
+ cache.putAll(entries);
+
+ cache.invokeAll(keys, new CacheEntryProcessor<Integer, Integer, Object>() {
+ @Override public Object process(MutableEntry<Integer, Integer> entry,
+ Object... arguments) throws EntryProcessorException {
+ if (ThreadLocalRandom.current().nextBoolean())
+ entry.setValue(entry.getValue() * 100);
+ else
+ entry.remove();
+
+ return entry;
+ }
+ });
+ }
+
+ try (Transaction tx = client.transactions().txStart()) {
+ for (int i = 0; i < 10; i++) {
+ cache.put(i, i);
+ cache.get(i);
+ }
+
+ tx.commit();
+ }
+
+ // Start async cache operations.
+ IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+
+ for (int i = 0; i < 10; i++) {
+ asyncCache.put(i, i);
+
+ asyncCache.future().get();
+
+ asyncCache.get(i);
+
+ asyncCache.future().get();
+ }
+
+ // Compute.
+// for (int i = 0; i < 10; i++) {
+// compute.broadcast(new IgniteCallable<Integer>() {
+// @IgniteInstanceResource
+// private Ignite ignite;
+//
+// @Override public Integer call() throws Exception {
+// return ignite.cache(null).localSize();
+// }
+// });
+//
+// compute.broadcast(new IgniteRunnable() {
+// @Override public void run() {
+// // No-op.
+// }
+// });
+//
+// compute.apply(new C1<String, String>() {
+// @Override public String apply(String o) {
+// return o.toUpperCase();
+// }
+// }, Arrays.asList("a", "b", "c"));
+// }
+
+ //Data structures.
+// for (int i = 0; i < 10; i++) {
+// assert atomicLong.incrementAndGet() >= 0;
+//
+// queue.offer("Test item");
+//
+// if (ThreadLocalRandom.current().nextBoolean())
+// for (int j = 0; j < 50; j++)
+// queue.poll();
+//
+// assert queue.size() <= 1000;
+//
+// assert sequence.addAndGet(i + 1) >= 0;
+// }
+ }
+ catch (CacheException | IgniteException e) {
+ log.info("Operation failed, ignore: " + e);
+ }
+
+ if (cntr.get() % 100 == 0)
+ log.info("Iteration: " + cntr);
+
+ if (barrier != null)
+ try {
+ barrier.await();
+ }
+ catch (BrokenBarrierException e) {
+ log.warning("Broken barrier.", e);
+
+ break;
+ }
+ }
+
+ return null;
+ }
+ }, THREADS, "test-operation-thread-" + client.name());
+
+ final AtomicBoolean disconnected = new AtomicBoolean(false);
+
+ final AtomicBoolean reconnected = new AtomicBoolean(false);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ if (!reconnected.get())
+ disconnected.set(true);
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ if (disconnected.get())
+ reconnected.set(true);
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ for (int i = 0; i < RESTART_CNT; i++) {
+ U.sleep(2000);
+
+ log.info("Block reconnect.");
+
+ reconnected.set(false);
+
+ disconnected.set(false);
+
+ log.info("Fail client.");
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return disconnected.get();
+ }
+ }, 5000L);
+
+ barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
+ @Override public void run() {
+ barrier = null;
+ }
+ });
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return reconnected.get();
+ }
+ }, 5000L);
+
+ try {
+ barrier.await(10, TimeUnit.SECONDS);
+ }
+ catch (TimeoutException e) {
+ log.error("Failed. Operation hangs.");
+
+ for (Ignite ignite : G.allGrids())
+ dumpCacheDebugInfo(ignite);
+
+ U.dumpThreads(log);
+
+ CyclicBarrier barrier0 = barrier;
+
+ if (barrier0 != null)
+ barrier0.reset();
+
+ stop.set(true);
+
+ fail("Failed to wait for update.");
+ }
+ }
+
+ stop.set(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/669ab137/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index 88f0c5f..affbb54 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -33,6 +33,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
suite.addTestSuite(IgniteClientReconnectStopTest.class);
suite.addTestSuite(IgniteClientReconnectApiBlockTest.class);
+ suite.addTestSuite(IgniteClientReconnectFailoverSelfTest.class);
suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class);
suite.addTestSuite(IgniteClientReconnectCacheTest.class);
suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class);