You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/09/07 22:46:46 UTC
[19/50] [abbrv] ignite git commit: # ignite-901 client reconnect
support
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
new file mode 100644
index 0000000..be3234d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -0,0 +1,1202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import junit.framework.*;
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstractTest {
+ /** */
+ private static final int SRV_CNT = 3;
+
+ /** */
+ private static final String STATIC_CACHE = "static-cache";
+
+ /** */
+ private UUID nodeId;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TestCommunicationSpi commSpi = new TestCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000);
+
+ if (nodeId != null) {
+ cfg.setNodeId(nodeId);
+
+ nodeId = null;
+ }
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(STATIC_CACHE);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(SRV_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnect() throws Exception {
+ clientMode = true;
+
+ IgniteEx client = startGrid(SRV_CNT);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ final IgniteCache<Object, Object> staticCache = client.cache(STATIC_CACHE);
+
+ staticCache.put(1, 1);
+
+ assertEquals(1, staticCache.get(1));
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setName("nearCache");
+
+ final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>());
+
+ nearCache.put(1, 1);
+
+ assertEquals(1, nearCache.localPeek(1));
+
+ cache.put(1, 1);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ final AtomicReference<IgniteInternalFuture> blockPutRef = new AtomicReference<>();
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ assertEquals(1, reconnectLatch.getCount());
+
+ blockPutRef.set(GridTestUtils.runAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ log.info("Start put.");
+
+ try {
+ cache.put(2, 2);
+
+ fail();
+ }
+ catch (CacheException e) {
+ log.info("Expected exception: " + e);
+
+ IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException) e.getCause();
+
+ e0.reconnectFuture().get();
+ }
+
+ cache.put(2, 2);
+
+ log.info("Finish put.");
+
+ return null;
+ }
+ }));
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ assertEquals(0, disconnectLatch.getCount());
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ log.info("Fail client.");
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ waitReconnectEvent(disconnectLatch);
+
+ IgniteInternalFuture putFut = blockPutRef.get();
+
+ assertNotDone(putFut);
+
+ U.sleep(5000);
+
+ assertNotDone(putFut);
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ checkCacheDiscoveryData(srv, client, null, true, true, false);
+
+ checkCacheDiscoveryData(srv, client, "nearCache", true, true, true);
+
+ checkCacheDiscoveryData(srv, client, STATIC_CACHE, true, true, false);
+
+ assertEquals(1, cache.get(1));
+
+ putFut.get();
+
+ assertEquals(2, cache.get(2));
+
+ cache.put(3, 3);
+
+ assertEquals(3, cache.get(3));
+
+ assertNull(nearCache.localPeek(1));
+
+ staticCache.put(10, 10);
+
+ assertEquals(10, staticCache.get(10));
+
+ nearCache.put(20, 20);
+
+ srv.cache(nearCache.getName()).put(20, 21);
+
+ assertEquals(21, nearCache.localPeek(20));
+
+ this.clientMode = false;
+
+ IgniteEx srv2 = startGrid(SRV_CNT + 1);
+
+ Integer key = primaryKey(srv2.cache(null));
+
+ cache.put(key, 4);
+
+ assertEquals(4, cache.get(key));
+
+ checkCacheDiscoveryData(srv2, client, null, true, true, false);
+
+ checkCacheDiscoveryData(srv2, client, "nearCache", true, true, true);
+
+ checkCacheDiscoveryData(srv2, client, STATIC_CACHE, true, true, false);
+
+ staticCache.put(20, 20);
+
+ assertEquals(20, staticCache.get(20));
+
+ srv.cache(nearCache.getName()).put(20, 22);
+
+ assertEquals(22, nearCache.localPeek(20));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectTransactions() throws Exception {
+ clientMode = true;
+
+ IgniteEx client = startGrid(SRV_CNT);
+
+ Ignite srv = clientRouter(client);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+
+ IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+ final IgniteTransactions txs = client.transactions();
+
+ final Transaction tx = txs.txStart(OPTIMISTIC, REPEATABLE_READ);
+
+ cache.put(1, 1);
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ try {
+ tx.commit();
+
+ fail();
+ } catch (IgniteClientDisconnectedException e) {
+ log.info("Expected error: " + e);
+
+ assertNotNull(e.reconnectFuture());
+ }
+
+ try {
+ txs.txStart();
+
+ fail();
+ } catch (IgniteClientDisconnectedException e) {
+ log.info("Expected error: " + e);
+
+ assertNotNull(e.reconnectFuture());
+ }
+ }
+ });
+
+ assertNull(txs.tx());
+
+ try (Transaction tx0 = txs.txStart(OPTIMISTIC, REPEATABLE_READ)) {
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+
+ tx0.commit();
+ }
+
+ try (Transaction tx0 = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(2, 2);
+
+ assertEquals(2, cache.get(2));
+
+ tx0.commit();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectTransactionInProgress1() throws Exception {
+ clientMode = true;
+
+ IgniteEx client = startGrid(SRV_CNT);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+ reconnectTransactionInProgress1(client, OPTIMISTIC, cache);
+
+ reconnectTransactionInProgress1(client, PESSIMISTIC, cache);
+ }
+
+ /**
+ * @param client Client.
+ * @param txConcurrency Transaction concurrency mode.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void reconnectTransactionInProgress1(IgniteEx client,
+ final TransactionConcurrency txConcurrency,
+ final IgniteCache<Object, Object> cache)
+ throws Exception
+ {
+ Ignite srv = clientRouter(client);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+ final TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ final IgniteTransactions txs = client.transactions();
+
+ final CountDownLatch afterPut1 = new CountDownLatch(1);
+
+ final CountDownLatch afterPut2 = new CountDownLatch(1);
+
+ final CountDownLatch putFailed = new CountDownLatch(1);
+
+ IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ try {
+ log.info("Start tx1: " + txConcurrency);
+
+ try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+ cache.put(1, 1);
+
+ afterPut1.countDown();
+
+ afterPut2.await();
+
+ cache.put(2, 2);
+
+ fail();
+ }
+ catch (CacheException e) {
+ log.info("Expected exception: " + e);
+
+ putFailed.countDown();
+
+ IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();
+
+ e0.reconnectFuture().get();
+ }
+
+ log.info("Start tx2: " + txConcurrency);
+
+ try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+ cache.put(1, 1);
+
+ cache.put(2, 2);
+
+ tx.commit();
+ }
+
+ assertEquals(1, cache.get(1));
+ assertEquals(2, cache.get(2));
+
+ try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+ cache.put(3, 3);
+
+ cache.put(4, 4);
+
+ tx.commit();
+ }
+
+ assertEquals(1, cache.get(1));
+ assertEquals(2, cache.get(2));
+ assertEquals(3, cache.get(3));
+ assertEquals(4, cache.get(4));
+
+ cache.removeAll();
+
+ return true;
+ }
+ catch (AssertionFailedError e) {
+ throw e;
+ }
+ catch (Throwable e) {
+ log.error("Unexpected error", e);
+
+ fail("Unexpected error: " + e);
+
+ return false;
+ }
+ }
+ });
+
+ assertTrue(afterPut1.await(5000, MILLISECONDS));
+
+ assertNotDone(fut);
+
+ srvSpi.failNode(client.localNode().id(), null);
+
+ waitReconnectEvent(disconnectLatch);
+
+ afterPut2.countDown();
+
+ assertTrue(putFailed.await(5000, MILLISECONDS));
+
+ clientSpi.writeLatch.countDown();
+
+ waitReconnectEvent(reconnectLatch);
+
+ assertTrue(fut.get());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectTransactionInProgress2() throws Exception {
+ clientMode = true;
+
+ final IgniteEx client = startGrid(SRV_CNT);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ txInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, OPTIMISTIC, 1);
+
+ txInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, PESSIMISTIC, 2);
+
+ txInProgressFails(client, ccfg, GridNearTxFinishResponse.class, OPTIMISTIC, 3);
+
+ txInProgressFails(client, ccfg, GridNearTxFinishResponse.class, PESSIMISTIC, 4);
+
+ txInProgressFails(client, ccfg, GridNearLockResponse.class, PESSIMISTIC, 5);
+ }
+
+ /**
+ * @param client Client.
+ * @param ccfg Cache configuration.
+ * @param msgToBlock Message to block.
+ * @param txConcurrency Transaction concurrency mode.
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void txInProgressFails(final IgniteEx client,
+ final CacheConfiguration<Object, Object> ccfg,
+ Class<?> msgToBlock,
+ final TransactionConcurrency txConcurrency,
+ final Integer key) throws Exception {
+ log.info("Test tx failure [msg=" + msgToBlock + ", txMode=" + txConcurrency + ", key=" + key + ']');
+
+ checkOperationInProgressFails(client, ccfg, msgToBlock,
+ new CI1<IgniteCache<Object, Object>>() {
+ @Override public void apply(IgniteCache<Object, Object> cache) {
+ try (Transaction tx = client.transactions().txStart(txConcurrency, REPEATABLE_READ)) {
+ log.info("Put1: " + key);
+
+ cache.put(key, key);
+
+ Integer key2 = key + 1;
+
+ log.info("Put2: " + key2);
+
+ cache.put(key2, key2);
+
+ log.info("Commit [key1=" + key + ", key2=" + key2 + ']');
+
+ tx.commit();
+ }
+ }
+ }
+ );
+
+ IgniteCache<Object, Object> cache = client.cache(ccfg.getName());
+
+ assertEquals(key, cache.get(key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectExchangeInProgress() throws Exception {
+ clientMode = true;
+
+ IgniteEx client = startGrid(SRV_CNT);
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ TestCommunicationSpi coordCommSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi();
+
+ coordCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, client.localNode().id());
+
+ clientMode = false;
+
+ startGrid(SRV_CNT + 1);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+ try {
+ coordCommSpi.stopBlock(true);
+
+ fail();
+ }
+ catch (IgniteException e) {
+ log.info("Expected error: " + e);
+ }
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName("newCache");
+
+ ccfg.setCacheMode(REPLICATED);
+
+ log.info("Start new cache.");
+
+ IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectInitialExchangeInProgress() throws Exception {
+ final UUID clientId = UUID.randomUUID();
+
+ Ignite srv = grid(0);
+
+ final CountDownLatch joinLatch = new CountDownLatch(1);
+
+ srv.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_NODE_JOINED && ((DiscoveryEvent)evt).eventNode().id().equals(clientId)) {
+ info("Client joined: " + evt);
+
+ joinLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+
+ TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+ srvCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, clientId);
+
+ clientMode = true;
+
+ nodeId = clientId;
+
+ IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ try {
+ Ignition.start(getConfiguration(getTestGridName(SRV_CNT)));
+
+ fail();
+
+ return false;
+ }
+ catch (IgniteClientDisconnectedException e) {
+ log.info("Expected start error: " + e);
+
+ try {
+ e.reconnectFuture().get();
+
+ fail();
+ }
+ catch (IgniteException e0) {
+ log.info("Expected future error: " + e0);
+ }
+
+ return true;
+ }
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
+
+ throw e;
+ }
+ }
+ });
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ assertTrue(joinLatch.await(5000, MILLISECONDS));
+
+ U.sleep(1000);
+
+ assertNotDone(fut);
+
+ srvSpi.failNode(clientId, null);
+
+ srvCommSpi.stopBlock(false);
+
+ assertTrue(fut.get());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectOperationInProgress() throws Exception {
+ clientMode = true;
+
+ IgniteEx client = startGrid(SRV_CNT);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
+ info("Client disconnected: " + evt);
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED)
+ info("Client reconnected: " + evt);
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ IgniteInClosure<IgniteCache<Object, Object>> putOp = new CI1<IgniteCache<Object, Object>>() {
+ @Override public void apply(IgniteCache<Object, Object> cache) {
+ cache.put(1, 1);
+ }
+ };
+
+ IgniteInClosure<IgniteCache<Object, Object>> getOp = new CI1<IgniteCache<Object, Object>>() {
+ @Override public void apply(IgniteCache<Object, Object> cache) {
+ cache.get(1);
+ }
+ };
+
+ int cnt = 0;
+
+ for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) {
+ CacheAtomicWriteOrderMode[] writeOrders =
+ atomicityMode == ATOMIC ? CacheAtomicWriteOrderMode.values() :
+ new CacheAtomicWriteOrderMode[]{CacheAtomicWriteOrderMode.CLOCK};
+
+ for (CacheAtomicWriteOrderMode writeOrder : writeOrders) {
+ for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+
+ ccfg.setAtomicWriteOrderMode(writeOrder);
+
+ ccfg.setName("cache-" + cnt++);
+
+ ccfg.setWriteSynchronizationMode(syncMode);
+
+ if (syncMode != FULL_ASYNC) {
+ Class<?> cls = (ccfg.getAtomicityMode() == ATOMIC) ?
+ GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class;
+
+ log.info("Test cache put [atomicity=" + atomicityMode +
+ ", writeOrder=" + writeOrder +
+ ", syncMode=" + syncMode + ']');
+
+ checkOperationInProgressFails(client, ccfg, cls, putOp);
+
+ client.destroyCache(ccfg.getName());
+ }
+
+ log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']');
+
+ checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getOp);
+
+ client.destroyCache(ccfg.getName());
+ }
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectCacheDestroyed() throws Exception {
+ clientMode = true;
+
+ final IgniteEx client = startGrid(SRV_CNT);
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(client);
+
+ final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srv.destroyCache(null);
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientCache.get(1);
+ }
+ }, IllegalStateException.class, null);
+
+ checkCacheDiscoveryData(srv, client, null, false, false, false);
+
+ IgniteCache<Object, Object> clientCache0 = client.getOrCreateCache(new CacheConfiguration<>());
+
+ checkCacheDiscoveryData(srv, client, null, true, true, false);
+
+ clientCache0.put(1, 1);
+
+ assertEquals(1, clientCache0.get(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectCacheDestroyedAndCreated() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(SRV_CNT);
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(client);
+
+ final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ assertEquals(ATOMIC,
+ clientCache.getConfiguration(CacheConfiguration.class).getAtomicityMode());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srv.destroyCache(null);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+
+ srv.getOrCreateCache(ccfg);
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientCache.get(1);
+ }
+ }, IllegalStateException.class, null);
+
+ checkCacheDiscoveryData(srv, client, null, true, false, false);
+
+ IgniteCache<Object, Object> clientCache0 = client.cache(null);
+
+ checkCacheDiscoveryData(srv, client, null, true, true, false);
+
+ assertEquals(TRANSACTIONAL,
+ clientCache0.getConfiguration(CacheConfiguration.class).getAtomicityMode());
+
+ clientCache0.put(1, 1);
+
+ assertEquals(1, clientCache0.get(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectMarshallerCache() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(SRV_CNT);
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(client);
+
+ final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+ final IgniteCache<Object, Object> srvCache = srv.cache(null);
+
+ assertNotNull(srvCache);
+
+ clientCache.put(1, new TestClass1());
+ srvCache.put(2, new TestClass2());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertNotNull(srvCache.get(1));
+ assertNotNull(srvCache.get(2));
+
+ srvCache.put(3, new TestClass3());
+ }
+ });
+
+ srvCache.put(4, new TestClass4());
+
+ assertNotNull(clientCache.get(1));
+ assertNotNull(clientCache.get(2));
+ assertNotNull(clientCache.get(3));
+ assertNotNull(clientCache.get(4));
+
+ clientCache.put(5, new TestClass5());
+
+ assertNotNull(srvCache.get(5));
+ assertNotNull(clientCache.get(5));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectClusterRestart() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(SRV_CNT);
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ clientCache.put(1, new TestClass1());
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ for (int i = 0; i < SRV_CNT; i++)
+ stopGrid(i);
+
+ assertTrue(disconnectLatch.await(30_000, MILLISECONDS));
+
+ clientMode = false;
+
+ Ignite srv = startGrid(0);
+
+ assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientCache.get(1);
+ }
+ }, IllegalStateException.class, null);
+
+ IgniteCache<Object, Object> srvCache = srv.getOrCreateCache(new CacheConfiguration<>());
+
+ srvCache.put(1, new TestClass1());
+ srvCache.put(2, new TestClass2());
+
+ IgniteCache<Object, Object> clientCache2 = client.cache(null);
+
+ assertNotNull(clientCache2);
+
+ assertNotNull(clientCache2.get(1));
+ assertNotNull(clientCache2.get(2));
+ }
+
+ /**
+ *
+ */
+ static class TestClass1 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass2 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass3 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass4 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass5 implements Serializable {}
+
+ /**
+ * @param client Client.
+ * @param ccfg Cache configuration.
+ * @param msgToBlock Message to block.
+ * @param c Cache operation closure.
+ * @throws Exception If failed.
+ */
+ private void checkOperationInProgressFails(IgniteEx client,
+ final CacheConfiguration<Object, Object> ccfg,
+ Class<?> msgToBlock,
+ final IgniteInClosure<IgniteCache<Object, Object>> c)
+ throws Exception
+ {
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+ for (int i = 0; i < SRV_CNT; i++) {
+ TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)grid(i).configuration().getCommunicationSpi();
+
+ srvCommSpi.blockMessages(msgToBlock, client.localNode().id());
+ }
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ IgniteClientDisconnectedException e0 = null;
+
+ try {
+ c.apply(cache);
+
+ fail();
+ }
+ catch (IgniteClientDisconnectedException e) {
+ log.info("Expected exception: " + e);
+
+ e0 = e;
+ }
+ catch (CacheException e) {
+ log.info("Expected exception: " + e);
+
+ assertTrue("Unexpected cause: " + e.getCause(),
+ e.getCause() instanceof IgniteClientDisconnectedException);
+
+ e0 = (IgniteClientDisconnectedException)e.getCause();
+ }
+
+ assertNotNull(e0);
+ assertNotNull(e0.reconnectFuture());
+
+ e0.reconnectFuture().get();
+
+ c.apply(cache);
+
+ return null;
+ }
+ });
+
+ Thread.sleep(1000);
+
+ assertNotDone(fut);
+
+ log.info("Fail client: " + client.localNode().id());
+
+ srvSpi.failNode(client.localNode().id(), null);
+
+ fut.get();
+
+ for (int i = 0; i < SRV_CNT; i++)
+ ((TestCommunicationSpi)grid(i).configuration().getCommunicationSpi()).stopBlock(false);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+ }
+
+ /**
+ * @param srv Server node.
+ * @param client Client node.
+ * @param cacheName Cache name.
+ * @param cacheExists Cache exists flag.
+ * @param clientCache {@code True} if client node has client cache.
+ * @param clientNear {@code True} if client node has near-enabled client cache.
+ */
+ private void checkCacheDiscoveryData(Ignite srv,
+ Ignite client,
+ String cacheName,
+ boolean cacheExists,
+ boolean clientCache,
+ boolean clientNear)
+ {
+ GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery();
+ GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery();
+
+ ClusterNode srvNode = ((IgniteKernal)srv).localNode();
+ ClusterNode clientNode = ((IgniteKernal)client).localNode();
+
+ assertFalse(srvDisco.cacheAffinityNode(clientNode, cacheName));
+ assertFalse(clientDisco.cacheAffinityNode(clientNode, cacheName));
+
+ assertEquals(cacheExists, srvDisco.cacheAffinityNode(srvNode, cacheName));
+
+ if (clientNear)
+ assertTrue(srvDisco.cacheNearNode(clientNode, cacheName));
+ else
+ assertEquals(clientCache, srvDisco.cacheClientNode(clientNode, cacheName));
+
+ assertEquals(cacheExists, clientDisco.cacheAffinityNode(srvNode, cacheName));
+
+ if (clientNear)
+ assertTrue(clientDisco.cacheNearNode(clientNode, cacheName));
+ else
+ assertEquals(clientCache, clientDisco.cacheClientNode(clientNode, cacheName));
+
+ if (cacheExists) {
+ if (clientCache || clientNear) {
+ assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+ assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+ }
+ else {
+ assertFalse(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+ assertFalse(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+ }
+ }
+ else {
+ assertTrue(client.cluster().forClientNodes(cacheName).nodes().isEmpty());
+ assertTrue(srv.cluster().forClientNodes(cacheName).nodes().isEmpty());
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+ /** */
+ private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ Object msg0 = ((GridIoMessage)msg).message();
+
+ synchronized (this) {
+ Set<UUID> blockNodes = blockCls.get(msg0.getClass());
+
+ if (F.contains(blockNodes, node.id())) {
+ log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) +
+ ", msg=" + msg0 + ']');
+
+ blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
+
+ return;
+ }
+ }
+ }
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * @param cls Message class.
+ * @param nodeId Node ID.
+ */
+ void blockMessages(Class<?> cls, UUID nodeId) {
+ synchronized (this) {
+ Set<UUID> set = blockCls.get(cls);
+
+ if (set == null) {
+ set = new HashSet<>();
+
+ blockCls.put(cls, set);
+ }
+
+ set.add(nodeId);
+ }
+ }
+
+ /**
+ * @param snd Send messages flag.
+ */
+ void stopBlock(boolean snd) {
+ synchronized (this) {
+ blockCls.clear();
+
+ if (snd) {
+ for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+ ClusterNode node = msg.get1();
+
+ log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) +
+ ", msg=" + msg.get2().message() + ']');
+
+ super.sendMessage(msg.get1(), msg.get2());
+ }
+ }
+
+ blockedMsgs.clear();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
new file mode 100644
index 0000000..ed811d9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueReconnect() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ queueReconnect(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ queueReconnect(colCfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueReconnectRemoved() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ queueReconnectRemoved(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ queueReconnectRemoved(colCfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueReconnectInProgress() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ queueReconnectInProgress(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ queueReconnectInProgress(colCfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSetReconnect() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ setReconnect(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ setReconnect(colCfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSetReconnectRemoved() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ setReconnectRemove(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ setReconnectRemove(colCfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSetReconnectInProgress() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(ATOMIC);
+
+ setReconnectInProgress(colCfg);
+
+ colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ setReconnectInProgress(colCfg);
+ }
+
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void setReconnect(CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final String setName = "set-" + colCfg.getAtomicityMode();
+
+ IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+ final IgniteSet<String> srvSet = srv.set(setName, null);
+
+ assertTrue(clientSet.add("1"));
+
+ assertFalse(srvSet.add("1"));
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertTrue(srvSet.add("2"));
+ }
+ });
+
+ assertFalse(clientSet.add("2"));
+
+ assertTrue(clientSet.remove("2"));
+
+ assertFalse(srvSet.contains("2"));
+ }
+
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void setReconnectRemove(CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(client);
+
+ final String setName = "set-rm-" + colCfg.getAtomicityMode();
+
+ final IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+ final IgniteSet<String> srvSet = srv.set(setName, null);
+
+ assertTrue(clientSet.add("1"));
+
+ assertFalse(srvSet.add("1"));
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvSet.close();
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientSet.add("fail");
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ IgniteSet<String> newClientSet = client.set(setName, colCfg);
+
+ IgniteSet<String> newSrvSet = srv.set(setName, null);
+
+ assertTrue(newClientSet.add("1"));
+
+ assertFalse(newSrvSet.add("1"));
+
+ newSrvSet.close();
+ }
+
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void setReconnectInProgress(final CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(client);
+
+ final String setName = "set-in-progress-" + colCfg.getAtomicityMode();
+
+ final IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+ final IgniteSet<String> srvSet = srv.set(setName, null);
+
+ assertTrue(clientSet.add("1"));
+
+ assertFalse(srvSet.add("1"));
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ if (colCfg.getAtomicityMode() == ATOMIC)
+ commSpi.blockMessage(GridNearAtomicUpdateResponse.class);
+ else
+ commSpi.blockMessage(GridNearTxPrepareResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ for (int i = 0; i < 100; i++)
+ clientSet.add("2");
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+
+ assertTrue(clientSet.add("3"));
+
+ assertFalse(srvSet.add("3"));
+
+ srvSet.close();
+ }
+
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void queueReconnect(CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final String setName = "queue-" + colCfg.getAtomicityMode();
+
+ IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+ final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+ assertTrue(clientQueue.offer("1"));
+
+ assertTrue(srvQueue.contains("1"));
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertTrue(srvQueue.add("2"));
+ }
+ });
+
+ assertTrue(clientQueue.contains("2"));
+
+ assertEquals("1", clientQueue.poll());
+ }
+
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void queueReconnectRemoved(CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final String setName = "queue-rmv" + colCfg.getAtomicityMode();
+
+ final IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+ final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+ assertTrue(clientQueue.add("1"));
+
+ assertTrue(srvQueue.add("2"));
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvQueue.close();
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientQueue.add("fail");
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ IgniteQueue<String> newClientQueue = client.queue(setName, 10, colCfg);
+
+ IgniteQueue<String> newSrvQueue = srv.queue(setName, 10, null);
+
+ assertTrue(newClientQueue.add("1"));
+
+ assertTrue(newSrvQueue.add("2"));
+ }
+
+ /**
+ * @param colCfg Collection configuration.
+ * @throws Exception If failed.
+ */
+ private void queueReconnectInProgress(final CollectionConfiguration colCfg) throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final String setName = "queue-rmv" + colCfg.getAtomicityMode();
+
+ final IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+ final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+ assertTrue(clientQueue.offer("1"));
+
+ assertTrue(srvQueue.contains("1"));
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ if (colCfg.getAtomicityMode() == ATOMIC)
+ commSpi.blockMessage(GridNearAtomicUpdateResponse.class);
+ else
+ commSpi.blockMessage(GridNearTxPrepareResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ clientQueue.add("2");
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", (Boolean)fut.get());
+
+ assertTrue(clientQueue.add("3"));
+
+ assertEquals("1", clientQueue.poll());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
new file mode 100644
index 0000000..e9667a1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectAffinityCallInProgress() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ IgniteCache<Integer, Integer> cache = client.getOrCreateCache("test-cache");
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ commSpi.blockMessage(GridJobExecuteResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ client.compute().affinityCall("test-cache", 40, new IgniteCallable<Object>() {
+ @Override public Integer call() throws Exception {
+ return 42;
+ }
+ });
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectBroadcastInProgress() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ commSpi.blockMessage(GridJobExecuteResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ client.compute().broadcast(new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ return 42;
+ }
+ });
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectApplyInProgress() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ commSpi.blockMessage(GridJobExecuteResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ client.compute().apply(new IgniteClosure<Integer, Integer>() {
+ @Override public Integer apply(Integer o) {
+ return o + 1;
+ }
+ }, Arrays.asList(1, 2, 3));
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
new file mode 100644
index 0000000..2bfdc85b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+
+import javax.cache.event.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientReconnectAbstractTest {
+ /** */
+ private static volatile CountDownLatch latch;
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEventListenerReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ EventListener lsnr = new EventListener();
+
+ UUID opId = client.events().remoteListen(lsnr, null, EventType.EVT_JOB_STARTED);
+
+ lsnr.latch = new CountDownLatch(1);
+
+ log.info("Created remote listener: " + opId);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ waitReconnectEvent(reconnectLatch);
+
+ client.compute().run(new DummyJob());
+
+ assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+ lsnr.latch = new CountDownLatch(1);
+
+ srv.compute().run(new DummyJob());
+
+ assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+ lsnr.latch = new CountDownLatch(1);
+
+ log.info("Stop listen, should not get events anymore.");
+
+ client.events().stopRemoteListen(opId);
+
+ assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMessageListenerReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final String topic = "testTopic";
+
+ MessageListener locLsnr = new MessageListener();
+
+ UUID opId = client.message().remoteListen(topic, new RemoteMessageListener());
+
+ client.message().localListen(topic, locLsnr);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ waitReconnectEvent(reconnectLatch);
+
+ locLsnr.latch = new CountDownLatch(1);
+ latch = new CountDownLatch(2);
+
+ client.message().send(topic, "msg1");
+
+ assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+ assertTrue(latch.await(5000, MILLISECONDS));
+
+ locLsnr.latch = new CountDownLatch(1);
+ latch = new CountDownLatch(2);
+
+ srv.message().send(topic, "msg2");
+
+ assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+ assertTrue(latch.await(5000, MILLISECONDS));
+
+ log.info("Stop listen, should not get remote messages anymore.");
+
+ client.message().stopRemoteListen(opId);
+
+ srv.message().send(topic, "msg3");
+
+ locLsnr.latch = new CountDownLatch(1);
+ latch = new CountDownLatch(1);
+
+ assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+ assertFalse(latch.await(3000, MILLISECONDS));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheContinuousQueryReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setAutoUnsubscribe(true);
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = clientCache.query(qry);
+
+ for (int i = 0; i < 5; i++) {
+ log.info("Iteration: " + i);
+
+ continuousQueryReconnect(client, clientCache, lsnr);
+ }
+
+ log.info("Close cursor, should not get cache events anymore.");
+
+ cur.close();
+
+ lsnr.latch = new CountDownLatch(1);
+
+ clientCache.put(3, 3);
+
+ assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+ }
+
+ /**
+ * @param client Client.
+ * @param clientCache Client cache.
+ * @param lsnr Continuous query listener.
+ * @throws Exception If failed.
+ */
+ private void continuousQueryReconnect(Ignite client,
+ IgniteCache<Object, Object> clientCache,
+ CacheEventListener lsnr)
+ throws Exception
+ {
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ };
+
+ client.events().localListen(p, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ waitReconnectEvent(reconnectLatch);
+
+ client.events().stopLocalListen(p);
+
+ lsnr.latch = new CountDownLatch(1);
+
+ clientCache.put(1, 1);
+
+ assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+ lsnr.latch = new CountDownLatch(1);
+
+ srv.cache(null).put(2, 2);
+
+ assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+ }
+
+ /**
+ *
+ */
+ private static class EventListener implements P2<UUID, Event> {
+ /** */
+ private volatile CountDownLatch latch;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Event evt) {
+ assertTrue(ignite.cluster().localNode().isClient());
+
+ ignite.log().info("Received event: " + evt);
+
+ if (latch != null)
+ latch.countDown();
+
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class MessageListener implements P2<UUID, Object> {
+ /** */
+ private volatile CountDownLatch latch;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object msg) {
+ assertTrue(ignite.cluster().localNode().isClient());
+
+ ignite.log().info("Local listener received message: " + msg);
+
+ if (latch != null)
+ latch.countDown();
+
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoteMessageListener implements P2<UUID, Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object msg) {
+ ignite.log().info("Remote listener received message: " + msg);
+
+ if (latch != null)
+ latch.countDown();
+
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+ /** */
+ private volatile CountDownLatch latch;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ int cnt = 0;
+
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ ignite.log().info("Received cache event: " + evt);
+
+ cnt++;
+ }
+
+ assertEquals(1, cnt);
+
+ if (latch != null)
+ latch.countDown();
+ }
+ }
+
+ /**
+ *
+ */
+ static class DummyJob implements IgniteRunnable {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ ignite.log().info("Job run.");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
new file mode 100644
index 0000000..feeebe5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnect() throws Exception {
+ final Ignite client = ignite(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ long topVer = 4;
+
+ IgniteCluster cluster = client.cluster();
+
+ cluster.nodeLocalMap().put("locMapKey", 10);
+
+ Map<Integer, Integer> nodeCnt = new HashMap<>();
+
+ nodeCnt.put(1, 1);
+ nodeCnt.put(2, 2);
+ nodeCnt.put(3, 3);
+ nodeCnt.put(4, 4);
+
+ for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
+ Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+
+ assertNotNull("No nodes for topology: " + e.getKey(), nodes);
+ assertEquals((int)e.getValue(), nodes.size());
+ }
+
+ ClusterNode locNode = cluster.localNode();
+
+ assertEquals(topVer, locNode.order());
+
+ TestTcpDiscoverySpi srvSpi = spi(clientRouter(client));
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ IgniteFuture<?> fut = client.cluster().clientReconnectFuture();
+
+ assertNotNull(fut);
+ assertFalse(fut.isDone());
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ waitReconnectEvent(reconnectLatch);
+
+ topVer += 2; // Client failed and rejoined.
+
+ locNode = cluster.localNode();
+
+ assertEquals(topVer, locNode.order());
+ assertEquals(topVer, cluster.topologyVersion());
+
+ nodeCnt.put(5, 3);
+ nodeCnt.put(6, 4);
+
+ for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
+ Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+
+ assertNotNull("No nodes for topology: " + e.getKey(), nodes);
+ assertEquals((int)e.getValue(), nodes.size());
+ }
+
+ assertEquals(10, cluster.nodeLocalMap().get("locMapKey"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
new file mode 100644
index 0000000..1b6523a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteClientReconnectAbstractTest {
+ /** */
+ private static final Integer THREADS = 1;
+
+ /** */
+ private volatile CyclicBarrier barrier;
+
+ /** */
+ protected static final long TEST_TIME = 90_000;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(30_000);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TEST_TIME * 60_000;
+ }
+
+ /**
+ * @param c Test closure.
+ * @throws Exception If failed.
+ */
+ protected final void reconnectFailover(final Callable<Void> c) throws Exception {
+ final Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final AtomicBoolean stop = new AtomicBoolean(false);
+
+ final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ int iter = 0;
+
+ while (!stop.get()) {
+ try {
+ c.call();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+ }
+
+ if (++iter % 100 == 0)
+ log.info("Iteration: " + iter);
+
+ if (barrier != null)
+ barrier.await();
+ }
+
+ return null;
+ } catch (Throwable e) {
+ log.error("Unexpected error in operation thread: " + e, e);
+
+ stop.set(true);
+
+ throw e;
+ }
+ }
+ }, THREADS, "test-operation-thread");
+
+ final AtomicReference<CountDownLatch> disconnected = new AtomicReference<>();
+ final AtomicReference<CountDownLatch> reconnected = new AtomicReference<>();
+
+ IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ CountDownLatch latch = reconnected.get();
+
+ assertNotNull(latch);
+ assertEquals(1, latch.getCount());
+
+ latch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ CountDownLatch latch = disconnected.get();
+
+ assertNotNull(latch);
+ assertEquals(1, latch.getCount());
+
+ latch.countDown();
+ }
+
+ return true;
+ }
+ };
+
+ client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ try {
+ long stopTime = System.currentTimeMillis() + TEST_TIME;
+
+ String err = null;
+
+ while (System.currentTimeMillis() < stopTime && !fut.isDone()) {
+ U.sleep(500);
+
+ CountDownLatch disconnectLatch = new CountDownLatch(1);
+ CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ disconnected.set(disconnectLatch);
+ reconnected.set(reconnectLatch);
+
+ UUID nodeId = client.cluster().localNode().id();
+
+ log.info("Fail client: " + nodeId);
+
+ srvSpi.failNode(nodeId, null);
+
+ if (!disconnectLatch.await(10_000, MILLISECONDS)) {
+ err = "Failed to wait for disconnect";
+
+ break;
+ }
+
+ if (!reconnectLatch.await(10_000, MILLISECONDS)) {
+ err = "Failed to wait for reconnect";
+
+ break;
+ }
+
+ barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
+ @Override public void run() {
+ barrier = null;
+ }
+ });
+
+ try {
+ barrier.await(10, SECONDS);
+ }
+ catch (TimeoutException e) {
+ err = "Operations hang or fail with unexpected error.";
+
+ break;
+ }
+ }
+
+ if (err != null) {
+ log.error(err);
+
+ U.dumpThreads(log);
+
+ CyclicBarrier barrier0 = barrier;
+
+ if (barrier0 != null)
+ barrier0.reset();
+
+ stop.set(true);
+
+ fut.get();
+
+ fail(err);
+ }
+
+ stop.set(true);
+
+ fut.get();
+ }
+ finally {
+ client.events().stopLocalListen(p);
+
+ stop.set(true);
+ }
+ }
+}