You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/09/07 22:46:47 UTC
[20/50] [abbrv] ignite git commit: # ignite-901 client reconnect
support
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
new file mode 100644
index 0000000..37773cd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
@@ -0,0 +1,846 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import javax.cache.processor.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testErrorOnDisconnect() throws Exception {
+ // Check cache operations.
+ cacheOperationsTest();
+
+ // Check cache operations.
+ beforeTestsStarted();
+ dataStructureOperationsTest();
+
+ // Check ignite operations.
+ beforeTestsStarted();
+ igniteOperationsTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void dataStructureOperationsTest() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ doTestIgniteOperationOnDisconnect(client, Arrays.asList(
+ // Check atomic long.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.atomicLong("testAtomic", 41, true);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.atomicLong("testAtomic", 41, true);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteAtomicLong atomicLong = (IgniteAtomicLong)o;
+
+ assertEquals(42, atomicLong.incrementAndGet());
+
+ return true;
+ }
+ }
+ ),
+ // Check set.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.set("testSet", new CollectionConfiguration());
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.set("testSet", new CollectionConfiguration());
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteSet set = (IgniteSet)o;
+
+ String val = "testVal";
+
+ set.add(val);
+
+ assertEquals(1, set.size());
+ assertTrue(set.contains(val));
+
+ return true;
+ }
+ }
+ ),
+ // Check ignite queue.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.queue("TestQueue", 10, new CollectionConfiguration());
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.queue("TestQueue", 10, new CollectionConfiguration());
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteQueue queue = (IgniteQueue)o;
+
+ String val = "Test";
+
+ queue.add(val);
+
+ assertEquals(val, queue.poll());
+
+ return true;
+ }
+ }
+ )
+ ));
+
+ clientMode = false;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void cacheOperationsTest() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ final IgniteCache<Object, Object> dfltCache = client.cache(null);
+
+ assertNotNull(dfltCache);
+
+ doTestIgniteOperationOnDisconnect(client, Arrays.asList(
+ // Check put and get operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ dfltCache.getAndPut(9999, 9999);
+ }
+ catch (CacheException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return dfltCache.getAndPut(9999, 9999);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNull(o);
+
+ assertEquals(9999, dfltCache.get(9999));
+
+ return true;
+ }
+ }
+ ),
+ // Check put operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ dfltCache.put(10000, 10000);
+ }
+ catch (CacheException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ dfltCache.put(10000, 10000);
+
+ return true;
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertTrue((Boolean)o);
+
+ assertEquals(10000, dfltCache.get(10000));
+
+ return true;
+ }
+ }
+ ),
+ // Check get operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ dfltCache.get(10001);
+ }
+ catch (CacheException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return dfltCache.get(10001);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNull(o);
+
+ return true;
+ }
+ }
+ ),
+ // Check invoke operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ dfltCache.invoke(10000, new CacheEntryProcessor<Object, Object, Object>() {
+ @Override public Object process(MutableEntry<Object, Object> entry,
+ Object... arguments) throws EntryProcessorException {
+ assertTrue(entry.exists());
+
+ return (int)entry.getValue() * 2;
+ }
+ });
+ }
+ catch (CacheException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return dfltCache.invoke(10000, new CacheEntryProcessor<Object, Object, Object>() {
+ @Override public Object process(MutableEntry<Object, Object> entry,
+ Object... arguments) throws EntryProcessorException {
+ assertTrue(entry.exists());
+
+ return (int)entry.getValue() * 2;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ assertEquals(20000, (int)o);
+
+ return true;
+ }
+ }
+ ),
+ // Check put async operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ IgniteCache<Object, Object> async = dfltCache.withAsync();
+
+ boolean failed = false;
+
+ try {
+ async.put(10002, 10002);
+
+ async.future().get();
+ }
+ catch (CacheException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ async.put(10002, 10002);
+
+ return async.future().get();
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNull(o);
+
+ assertEquals(10002, dfltCache.get(10002));
+
+ return true;
+ }
+ }
+ ),
+ // Check transaction.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.transactions();
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.transactions();
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteTransactions txs = (IgniteTransactions)o;
+
+ assertNotNull(txs);
+
+ return true;
+ }
+ }
+ ),
+ // Check get cache.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.cache(null);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.cache(null);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)o;
+
+ assertNotNull(cache0);
+
+ cache0.put(1, 1);
+
+ assertEquals(1, cache0.get(1));
+
+ return true;
+ }
+ }
+ ),
+ // Check streamer.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.dataStreamer(null);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.dataStreamer(null);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)o;
+
+ streamer.addData(2, 2);
+
+ streamer.close();
+
+ assertEquals(2, client.cache(null).get(2));
+
+ return true;
+ }
+ }
+ ),
+ // Check create cache.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.createCache("test_cache");
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.createCache("test_cache");
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteCache<Object, Object> cache = (IgniteCache<Object, Object>)o;
+
+ assertNotNull(cache);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+
+ return true;
+ }
+ }
+ )
+
+ ));
+
+ clientMode = false;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void igniteOperationsTest() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ final IgniteCache<Object, Object> dfltCache = client.cache(null);
+
+ final CountDownLatch recvLatch = new CountDownLatch(1);
+
+ assertNotNull(dfltCache);
+
+ doTestIgniteOperationOnDisconnect(client, Arrays.asList(
+ // Check compute.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.compute();
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.compute();
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteCompute comp = (IgniteCompute)o;
+
+ Collection<UUID> uuids = comp.broadcast(new IgniteCallable<UUID>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public UUID call() throws Exception {
+ return ignite.cluster().localNode().id();
+ }
+ });
+
+ assertFalse(uuids.isEmpty());
+
+ for (UUID uuid : uuids)
+ assertNotNull(uuid);
+
+ return true;
+ }
+ }
+ ),
+
+ // Check ping node.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.cluster().pingNode(new UUID(0, 0));
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.cluster().pingNode(new UUID(0, 0));
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ Boolean pingNode = (Boolean)o;
+
+ assertFalse(pingNode);
+
+ return true;
+ }
+ }
+ ),
+ // Check register remote listener.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.events().remoteListen(null, new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event event) {
+ return true;
+ }
+ });
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.events().remoteListen(null, new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event event) {
+ return true;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ UUID remoteId = (UUID)o;
+
+ assertNotNull(remoteId);
+
+ client.events().stopRemoteListen(remoteId);
+
+ return true;
+ }
+ }
+ ),
+ // Check message operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() {
+ @Override public boolean apply(UUID uuid, Object o) {
+ if (o.equals("Test message."))
+ recvLatch.countDown();
+
+ return true;
+ }
+ });
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() {
+ @Override public boolean apply(UUID uuid, Object o) {
+ if (o.equals("Test message."))
+ recvLatch.countDown();
+
+ return true;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteMessaging msg = client.message();
+
+ msg.send(null, "Test message.");
+
+ try {
+ assertTrue(recvLatch.await(2, SECONDS));
+ }
+ catch (InterruptedException e) {
+ fail("Message wasn't received.");
+ }
+
+ return true;
+ }
+ }
+ ),
+ // Check executor.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.executorService().submit(new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ return 42;
+ }
+ });
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.executorService().submit(new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ return 42;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ Future<Integer> fut = (Future<Integer>)o;
+
+ try {
+ assertEquals(42, (int)fut.get());
+ }
+ catch (Exception e) {
+ fail("Failed submit task.");
+ }
+
+ return true;
+ }
+ }
+ )
+ ));
+
+ clientMode = false;
+ }
+
+ /**
+ * @param client Client.
+ * @param ops Operations closures.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void doTestIgniteOperationOnDisconnect(Ignite client, final List<T2<Callable, C1<Object, Boolean>>> ops)
+ throws Exception {
+ assertNotNull(client.cache(null));
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ final List<IgniteInternalFuture> futs = new ArrayList<>();
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ assertEquals(1, reconnectLatch.getCount());
+
+ for (T2<Callable, C1<Object, Boolean>> op : ops)
+ futs.add(GridTestUtils.runAsync(op.get1()));
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ try {
+ log.info("Fail client.");
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ waitReconnectEvent(disconnectLatch);
+
+ assertEquals(ops.size(), futs.size());
+
+ for (IgniteInternalFuture<?> fut : futs)
+ assertNotDone(fut);
+
+ U.sleep(2000);
+
+ for (IgniteInternalFuture<?> fut : futs)
+ assertNotDone(fut);
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ waitReconnectEvent(reconnectLatch);
+
+ // Check operation after reconnect working.
+ for (int i = 0; i < futs.size(); i++) {
+ final int i0 = i;
+
+ try {
+ final Object futRes = futs.get(i0).get(2, SECONDS);
+
+ assertTrue(GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ return ops.get(i0).get2().apply(futRes);
+ }
+ }).get(2, SECONDS));
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ e.printStackTrace();
+
+ fail("Operation timeout. Iteration: " + i + ".");
+ }
+ }
+ }
+ finally {
+ clientSpi.writeLatch.countDown();
+
+ for (IgniteInternalFuture fut : futs)
+ fut.cancel();
+
+ stopAllGrids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
new file mode 100644
index 0000000..bb568ab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicSeqReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true);
+
+ assertEquals(1L, clientAtomicSeq.incrementAndGet());
+
+ final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeq", 0, false);
+
+ assertEquals(1001L, srvAtomicSeq.incrementAndGet());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertEquals(1002L, srvAtomicSeq.incrementAndGet());
+ }
+ });
+
+ assertEquals(2L, clientAtomicSeq.incrementAndGet());
+
+ assertEquals(1003L, srvAtomicSeq.incrementAndGet());
+
+ assertEquals(3L, clientAtomicSeq.incrementAndGet());
+
+ clientAtomicSeq.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicSeqReconnectRemoved() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
+
+ clientAtomicSeq.batchSize(1);
+
+ assertEquals(1L, clientAtomicSeq.incrementAndGet());
+
+ final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeqRmv", 0, false);
+
+ srvAtomicSeq.batchSize(1);
+
+ assertEquals(1001L, srvAtomicSeq.incrementAndGet());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvAtomicSeq.close();
+
+ assert srvAtomicSeq.removed();
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < 2000; i++)
+ clientAtomicSeq.incrementAndGet();
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ IgniteAtomicSequence newClientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
+
+ assertEquals(0, newClientAtomicSeq.get());
+
+ assertEquals(1, newClientAtomicSeq.incrementAndGet());
+
+ newClientAtomicSeq.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicSeqReconnectInProgress() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqInProg", 0, true);
+
+ clientAtomicSeq.batchSize(1);
+
+ final IgniteAtomicSequence srvAtomicSeq = srv.atomicSequence("atomicSeqInProg", 0, false);
+
+ srvAtomicSeq.batchSize(1);
+
+ commSpi.blockMessage(GridNearLockResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < 3000; i++) {
+ try {
+ clientAtomicSeq.incrementAndGet();
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+
+ // Check that after reconnect working.
+ assert clientAtomicSeq.incrementAndGet() >= 0;
+ assert srvAtomicSeq.incrementAndGet() >= 0;
+
+ clientAtomicSeq.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReferenceReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRef", "1st value", true);
+
+ assertEquals("1st value", clientAtomicRef.get());
+ assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value"));
+ assertEquals("2st value", clientAtomicRef.get());
+
+ final IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRef", "1st value", false);
+
+ assertEquals("2st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value"));
+ assertEquals("3st value", srvAtomicRef.get());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertEquals("3st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("3st value", "4st value"));
+ assertEquals("4st value", srvAtomicRef.get());
+ }
+ });
+
+ assertEquals("4st value", clientAtomicRef.get());
+ assertTrue(clientAtomicRef.compareAndSet("4st value", "5st value"));
+ assertEquals("5st value", clientAtomicRef.get());
+
+ assertEquals("5st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("5st value", "6st value"));
+ assertEquals("6st value", srvAtomicRef.get());
+
+ srvAtomicRef.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReferenceReconnectRemoved() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final IgniteAtomicReference<String> clientAtomicRef =
+ client.atomicReference("atomicRefRemoved", "1st value", true);
+
+ assertEquals("1st value", clientAtomicRef.get());
+ assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value"));
+ assertEquals("2st value", clientAtomicRef.get());
+
+ final IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false);
+
+ assertEquals("2st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value"));
+ assertEquals("3st value", srvAtomicRef.get());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvAtomicRef.close();
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientAtomicRef.compareAndSet("3st value", "4st value");
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ IgniteAtomicReference<String> newClientAtomicRef =
+ client.atomicReference("atomicRefRemoved", "1st value", true);
+
+ IgniteAtomicReference<String> newSrvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false);
+
+ assertEquals("1st value", newClientAtomicRef.get());
+ assertTrue(newClientAtomicRef.compareAndSet("1st value", "2st value"));
+ assertEquals("2st value", newClientAtomicRef.get());
+
+ assertEquals("2st value", newSrvAtomicRef.get());
+ assertTrue(newSrvAtomicRef.compareAndSet("2st value", "3st value"));
+ assertEquals("3st value", newSrvAtomicRef.get());
+
+ newClientAtomicRef.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReferenceReconnectInProgress() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final IgniteAtomicReference<String> clientAtomicRef =
+ client.atomicReference("atomicRefInProg", "1st value", true);
+
+ assertEquals("1st value", clientAtomicRef.get());
+ assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value"));
+ assertEquals("2st value", clientAtomicRef.get());
+
+ IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefInProg", "1st value", false);
+
+ assertEquals("2st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value"));
+ assertEquals("3st value", srvAtomicRef.get());
+
+ BlockTpcCommunicationSpi servCommSpi = commSpi(srv);
+
+ servCommSpi.blockMessage(GridNearLockResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ clientAtomicRef.compareAndSet("3st value", "4st value");
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ servCommSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+
+ // Check that after reconnect working.
+ assertEquals("3st value", clientAtomicRef.get());
+ assertTrue(clientAtomicRef.compareAndSet("3st value", "4st value"));
+ assertEquals("4st value", clientAtomicRef.get());
+
+ assertEquals("4st value", srvAtomicRef.get());
+ assertTrue(srvAtomicRef.compareAndSet("4st value", "5st value"));
+ assertEquals("5st value", srvAtomicRef.get());
+
+ srvAtomicRef.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicStampedReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStamped", 0, 0, true);
+
+ assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
+ assertEquals(1, clientAtomicStamped.value());
+ assertEquals(1, clientAtomicStamped.stamp());
+
+ final IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStamped", 0, 0, false);
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2));
+ assertEquals(2, srvAtomicStamped.value());
+ assertEquals(2, srvAtomicStamped.stamp());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertEquals(true, srvAtomicStamped.compareAndSet(2, 3, 2, 3));
+ assertEquals(3, srvAtomicStamped.value());
+ assertEquals(3, srvAtomicStamped.stamp());
+ }
+ });
+
+ assertEquals(true, clientAtomicStamped.compareAndSet(3, 4, 3, 4));
+ assertEquals(4, clientAtomicStamped.value());
+ assertEquals(4, clientAtomicStamped.stamp());
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(4, 5, 4, 5));
+ assertEquals(5, srvAtomicStamped.value());
+ assertEquals(5, srvAtomicStamped.stamp());
+
+ srvAtomicStamped.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicStampedReconnectRemoved() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true);
+
+ assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
+ assertEquals(1, clientAtomicStamped.value());
+ assertEquals(1, clientAtomicStamped.stamp());
+
+ final IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false);
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2));
+ assertEquals(2, srvAtomicStamped.value());
+ assertEquals(2, srvAtomicStamped.stamp());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvAtomicStamped.close();
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientAtomicStamped.compareAndSet(2, 3, 2, 3);
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ IgniteAtomicStamped newClientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true);
+
+ assertEquals(true, newClientAtomicStamped.compareAndSet(0, 1, 0, 1));
+ assertEquals(1, newClientAtomicStamped.value());
+ assertEquals(1, newClientAtomicStamped.stamp());
+
+ IgniteAtomicStamped newSrvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false);
+
+ assertEquals(true, newSrvAtomicStamped.compareAndSet(1, 2, 1, 2));
+ assertEquals(2, newSrvAtomicStamped.value());
+ assertEquals(2, newSrvAtomicStamped.stamp());
+
+ newClientAtomicStamped.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicStampedReconnectInProgress() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedInProgress", 0, 0, true);
+
+ assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1));
+ assertEquals(1, clientAtomicStamped.value());
+ assertEquals(1, clientAtomicStamped.stamp());
+
+ IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedInProgress", 0, 0, false);
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2));
+ assertEquals(2, srvAtomicStamped.value());
+ assertEquals(2, srvAtomicStamped.stamp());
+
+ BlockTpcCommunicationSpi servCommSpi = commSpi(srv);
+
+ servCommSpi.blockMessage(GridNearLockResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ clientAtomicStamped.compareAndSet(2, 3, 2, 3);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ servCommSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+
+ // Check that after reconnect working.
+ assertEquals(true, clientAtomicStamped.compareAndSet(2, 3, 2, 3));
+ assertEquals(3, clientAtomicStamped.value());
+ assertEquals(3, clientAtomicStamped.stamp());
+
+ assertEquals(true, srvAtomicStamped.compareAndSet(3, 4, 3, 4));
+ assertEquals(4, srvAtomicStamped.value());
+ assertEquals(4, srvAtomicStamped.stamp());
+
+ srvAtomicStamped.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicLongReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true);
+
+ assertEquals(0L, clientAtomicLong.getAndAdd(1));
+
+ final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false);
+
+ assertEquals(1L, srvAtomicLong.getAndAdd(1));
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertEquals(2L, srvAtomicLong.getAndAdd(1));
+ }
+ });
+
+ assertEquals(3L, clientAtomicLong.getAndAdd(1));
+
+ assertEquals(4L, srvAtomicLong.getAndAdd(1));
+
+ assertEquals(5L, clientAtomicLong.getAndAdd(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicLongReconnectRemoved() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
+
+ assertEquals(0L, clientAtomicLong.getAndAdd(1));
+
+ final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false);
+
+ assertEquals(1L, srvAtomicLong.getAndAdd(1));
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvAtomicLong.close();
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientAtomicLong.getAndAdd(1);
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ IgniteAtomicLong newClientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
+
+ assertEquals(0L, newClientAtomicLong.getAndAdd(1));
+
+ IgniteAtomicLong newSrvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false);
+
+ assertEquals(1L, newSrvAtomicLong.getAndAdd(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicLongReconnectInProgress() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongInProggress", 0, true);
+
+ final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongInProggress", 0, false);
+
+ commSpi.blockMessage(GridNearLockResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ clientAtomicLong.getAndAdd(1);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+
+ // Check that after reconnect working.
+ assertEquals(1, clientAtomicLong.addAndGet(1));
+ assertEquals(2, srvAtomicLong.addAndGet(1));
+
+ clientAtomicLong.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLatchReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true);
+
+ assertEquals(3, clientLatch.count());
+
+ final IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false);
+
+ assertEquals(3, srvLatch.count());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvLatch.countDown();
+ }
+ });
+
+ assertEquals(2, srvLatch.count());
+ assertEquals(2, clientLatch.count());
+
+ srvLatch.countDown();
+
+ assertEquals(1, srvLatch.count());
+ assertEquals(1, clientLatch.count());
+
+ clientLatch.countDown();
+
+ assertEquals(0, srvLatch.count());
+ assertEquals(0, clientLatch.count());
+
+ assertTrue(srvLatch.await(1000));
+ assertTrue(clientLatch.await(1000));
+ }
+}