You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/07/16 18:09:28 UTC
[21/35] incubator-ignite git commit: # ignite-901 client reconnect
support
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
new file mode 100644
index 0000000..7cfc329
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectFailoverAbstractTest {
+ /** */
+ protected static final String ATOMIC_CACHE = "ATOMIC_CACHE";
+
+ /** */
+ protected static final String TX_CACHE = "TX_CACHE";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg1 = new CacheConfiguration();
+
+ ccfg1.setName(ATOMIC_CACHE);
+ ccfg1.setBackups(1);
+ ccfg1.setAtomicityMode(ATOMIC);
+
+ CacheConfiguration ccfg2 = new CacheConfiguration();
+
+ ccfg2.setName(TX_CACHE);
+ ccfg2.setBackups(1);
+ ccfg2.setAtomicityMode(TRANSACTIONAL);
+
+ cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectAtomicCache() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ final IgniteCache<Integer, Integer> cache = client.cache(ATOMIC_CACHE);
+
+ assertNotNull(cache);
+
+ assertEquals(ATOMIC, cache.getConfiguration(CacheConfiguration.class).getAtomicityMode());
+
+ reconnectFailover(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ TreeMap<Integer, Integer> map = new TreeMap<>();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 10; i++) {
+ Integer key = rnd.nextInt(0, 100_000);
+
+ cache.put(key, key);
+
+ assertEquals(key, cache.get(key));
+
+ map.put(key, key);
+ }
+
+ cache.putAll(map);
+
+ Map<Integer, Integer> res = cache.getAll(map.keySet());
+
+ assertEquals(map, res);
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectTxCache() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ final IgniteCache<Integer, Integer> cache = client.cache(TX_CACHE);
+
+ assertNotNull(cache);
+
+ assertEquals(TRANSACTIONAL, cache.getConfiguration(CacheConfiguration.class).getAtomicityMode());
+
+ final IgniteTransactions txs = client.transactions();
+
+ reconnectFailover(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ TreeMap<Integer, Integer> map = new TreeMap<>();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 5; i++) {
+ Integer key = rnd.nextInt(0, 100_000);
+
+ cache.put(key, key);
+
+ assertEquals(key, cache.get(key));
+
+ map.put(key, key);
+ }
+
+ for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) {
+ try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+ for (Map.Entry<Integer, Integer> e : map.entrySet()) {
+ cache.put(e.getKey(), e.getValue());
+
+ assertNotNull(cache.get(e.getKey()));
+ }
+
+ tx.commit();
+ }
+ }
+
+ cache.putAll(map);
+
+ Map<Integer, Integer> res = cache.getAll(map.keySet());
+
+ assertEquals(map, res);
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectComputeApi() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ final IgniteCompute comp = client.compute();
+
+ reconnectFailover(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ comp.call(new DummyClosure());
+
+ comp.broadcast(new DummyClosure());
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectStreamerApi() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ reconnectFailover(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ stream(ATOMIC_CACHE);
+
+ stream(TX_CACHE);
+
+ return null;
+ }
+
+ private void stream(String cacheName) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(cacheName)) {
+ streamer.allowOverwrite(true);
+
+ streamer.perNodeBufferSize(10);
+
+ for (int i = 0; i < 100; i++)
+ streamer.addData(rnd.nextInt(100_000), 0);
+ }
+ }
+ });
+ }
+
+ /**
+ *
+ */
+ public static class DummyClosure implements IgniteCallable<Object> {
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ return 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
new file mode 100644
index 0000000..31b4192
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.service.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.services.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnect() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ IgniteServices services = client.services();
+
+ services.deployClusterSingleton("testReconnect", new TestServiceImpl());
+
+ TestService srvc = services.serviceProxy("testReconnect", TestService.class, false);
+
+ assertNotNull(srvc);
+
+ long topVer = grid(0).cluster().topologyVersion();
+
+ assertEquals((Object)topVer, srvc.test());
+
+ Ignite srv = clientRouter(client);
+
+ reconnectClientNode(client, srv, null);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ DummyService.exeLatch("testReconnect2", latch);
+
+ services.deployClusterSingleton("testReconnect2", new DummyService());
+
+ assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
+
+ assertEquals((Object)(topVer + 2), srvc.test());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServiceRemove() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ IgniteServices clnServices = client.services();
+
+ final IgniteServices srvServices = srv.services();
+
+ srvServices.deployClusterSingleton("testServiceRemove", new TestServiceImpl());
+
+ final TestService srvc = clnServices.serviceProxy("testServiceRemove", TestService.class, false);
+
+ assertNotNull(srvc);
+
+ assertNotNull(srvc.test());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srvServices.cancel("testServiceRemove");
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return srvc.test();
+ }
+ }, IgniteException.class, null);
+
+ clnServices.deployClusterSingleton("testServiceRemove", new TestServiceImpl());
+
+ TestService newSrvc = clnServices.serviceProxy("testServiceRemove", TestService.class, false);
+
+ assertNotNull(newSrvc);
+ assertNotNull(newSrvc.test());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectInDeploying() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final IgniteServices services = client.services();
+
+ Ignite srv = clientRouter(client);
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ commSpi.blockMessage(GridNearTxPrepareResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ services.deployClusterSingleton("testReconnectInDeploying", new TestServiceImpl());
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectInProgress() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final IgniteServices services = client.services();
+
+ final Ignite srv = clientRouter(client);
+
+ services.deployClusterSingleton("testReconnectInProgress", new TestServiceImpl());
+
+ final TestService srvc = services.serviceProxy("testReconnectInProgress", TestService.class, false);
+
+ assertNotNull(srvc);
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ commSpi.blockMessage(GridJobExecuteResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ srvc.test();
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+ }
+
+ /**
+ *
+ */
+ public static interface TestService {
+ /**
+ * @return Topology version.
+ */
+ public Long test();
+ }
+
+ /**
+ *
+ */
+ public static class TestServiceImpl implements Service, TestService {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public void cancel(ServiceContext ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init(ServiceContext ctx) throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(ServiceContext ctx) throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Long test() {
+ assertFalse(ignite.cluster().localNode().isClient());
+
+ return ignite.cluster().topologyVersion();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
new file mode 100644
index 0000000..98c3d0f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectStopTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStopWhenDisconnected() throws Exception {
+ clientMode = true;
+
+ Ignite client = startGrid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ waitReconnectEvent(disconnectLatch);
+
+ IgniteFuture<?> reconnectFut = null;
+
+ try {
+ client.getOrCreateCache(new CacheConfiguration<>());
+
+ fail();
+ }
+ catch (IgniteClientDisconnectedException e) {
+ log.info("Expected operation exception: " + e);
+
+ reconnectFut = e.reconnectFuture();
+ }
+
+ assertNotNull(reconnectFut);
+
+ client.close();
+
+ try {
+ reconnectFut.get();
+
+ fail();
+ }
+ catch (IgniteException e) {
+ log.info("Expected reconnect exception: " + e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
new file mode 100644
index 0000000..a4cf77f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.datastreamer.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbstractTest {
+ /** */
+ public static final String CACHE_NAME = "streamer";
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>(CACHE_NAME)
+ .setAtomicityMode(ATOMIC)
+ .setCacheMode(PARTITIONED);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStreamerReconnect() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME);
+
+ IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME);
+
+ for (int i = 0; i < 50; i++)
+ streamer.addData(i, i);
+
+ streamer.flush();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return srvCache.localSize() == 50;
+ }
+ }, 2000L);
+
+ assertEquals(50, srvCache.localSize());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ try {
+ client.dataStreamer(CACHE_NAME);
+
+ fail();
+ }
+ catch (IgniteClientDisconnectedException e) {
+ assertNotNull(e.reconnectFuture());
+ }
+ }
+ });
+
+ checkStreamerClosed(streamer);
+
+ streamer = client.dataStreamer(CACHE_NAME);
+
+ for (int i = 50; i < 100; i++)
+ streamer.addData(i, i);
+
+ streamer.flush();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return srvCache.localSize() == 100;
+ }
+ }, 2000L);
+
+ assertEquals(100, srvCache.localSize());
+
+ streamer.close();
+
+ streamer.future().get(2, TimeUnit.SECONDS);
+
+ srvCache.removeAll();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStreamerReconnectInProgress() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME);
+
+ final IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME);
+
+ BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+ commSpi.blockMessage(DataStreamerResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ for (int i = 0; i < 50; i++)
+ streamer.addData(i, i);
+
+ streamer.flush();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+ finally {
+ streamer.close();
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ commSpi.unblockMessage();
+
+ reconnectClientNode(client, srv, null);
+
+ assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+
+ checkStreamerClosed(streamer);
+
+ IgniteDataStreamer<Integer, Integer> streamer2 = client.dataStreamer(CACHE_NAME);
+
+ for (int i = 0; i < 50; i++)
+ streamer2.addData(i, i);
+
+ streamer2.close();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return srvCache.localSize() == 50;
+ }
+ }, 2000L);
+
+ assertEquals(50, srvCache.localSize());
+ }
+
+ /**
+ * @param streamer Streamer.
+ */
+ private void checkStreamerClosed(IgniteDataStreamer<Integer, Integer> streamer) {
+ try {
+ streamer.addData(100, 100);
+
+ fail();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+ }
+
+ try {
+ streamer.flush();
+
+ fail();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+ }
+
+ try {
+ streamer.future().get();
+
+ fail();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+ }
+
+ streamer.tryFlush();
+
+ streamer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 27c2a61..a392245 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -62,6 +62,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
IgniteConfiguration cfg = super.getConfiguration(gridName);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientReconnectDisabled(true);
if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName))
cfg.setClientMode(true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
index 9780080..62f5d41 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.managers.deployment;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.resource.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.jdk.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.deployment.*;
@@ -95,5 +96,11 @@ public class GridDeploymentManagerStopSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public boolean unregister(String rsrcName) { return false; }
+
+ /** {@inheritDoc} */
+ @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) { /* No-op. */ }
+
+ /** {@inheritDoc} */
+ @Override public void onClientReconnected(boolean clusterRestarted) { /* No-op. */ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
index 074f6ff..9c30f23 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
@@ -263,7 +263,7 @@ public abstract class IgniteCacheAbstractStopBusySelfTest extends GridCommonAbst
e.printStackTrace(pw);
- assertTrue(sw.toString().contains("grid is stopping"));
+ assertTrue(sw.toString().contains("node is stopping"));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
index 071341e..8703d32 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -27,7 +26,7 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
-import javax.cache.Cache;
+import javax.cache.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -89,7 +88,8 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest {
@Override public void apply(IgniteFuture<?> f) {
try {
f.get();
- } catch (IgniteException ignore) {
+ }
+ catch (CacheException ignore) {
// This may be debugged.
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
index af3ea9d..30bf5dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.transactions.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.indexing.*;
import org.apache.ignite.testframework.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
index d78add6..53404cc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
@@ -75,7 +75,7 @@ public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelf
jcache.get("1");
jcache.put("1", "11");
- GridCacheAdapter<Object, Object> utilityCache = ignite.context().cache().utilityCache();
+ IgniteInternalCache<Object, Object> utilityCache = ignite.context().cache().utilityCache();
utilityCache.getAndPutIfAbsent("2", "2");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
index 19e40bf..7a2e8b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
@@ -220,7 +220,8 @@ public class GridCacheReplicatedInvalidateSelfTest extends GridCommonAbstractTes
Object msg0 = ((GridIoMessage)msg).message();
if (!(msg0 instanceof GridClockDeltaSnapshotMessage)) {
- info("Sending message [locNodeId=" + getLocalNodeId() + ", destNodeId= " + destNode.id()
+ info("Sending message [locNodeId=" + ignite.cluster().localNode().id() +
+ ", destNodeId= " + destNode.id()
+ ", msg=" + msg + ']');
synchronized (msgCntMap) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index e9d7a45..9a883b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -55,8 +55,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
new GridCacheDeploymentManager<K, V>(),
new GridCachePartitionExchangeManager<K, V>(),
new GridCacheIoManager(),
- null,
- new CacheNoopJtaManager()
+ new CacheNoopJtaManager(),
+ null
),
defaultCacheConfiguration(),
CacheType.USER,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index ec6a526..63db0c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -111,6 +111,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
+ /** */
+ private boolean reconnectDisabled;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -159,6 +162,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
disco.setJoinTimeout(joinTimeout);
disco.setNetworkTimeout(netTimeout);
+ disco.setClientReconnectDisabled(reconnectDisabled);
+
disco.afterWrite(afterWrite);
cfg.setDiscoverySpi(disco);
@@ -524,7 +529,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
TestTcpDiscoverySpi spi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
- spi.pauseAll();
+ spi.pauseAll(false);
try {
spi.brakeConnection();
@@ -568,7 +573,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
TestTcpDiscoverySpi spi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
- spi.pauseAll();
+ spi.pauseAll(false);
try {
spi.brakeConnection();
@@ -606,7 +611,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
attachListeners(2, 2);
- ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
+ ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(true);
stopGrid("server-2");
@@ -633,6 +638,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
public void testClientSegmentation() throws Exception {
clientsPerSrv = 1;
+ reconnectDisabled = true;
+
startServerNodes(3);
startClientNodes(3);
@@ -656,6 +663,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
final TcpDiscoverySpi disco = (TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi();
try {
+ log.info("Fail server: " + 2);
+
failServer(2);
await(srvFailedLatch);
@@ -886,8 +895,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
try {
startClientNodes(1);
- assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode)G.ignite("client-0")
- .cluster().localNode()).clientRouterNodeId());
+ assertEquals(G.ignite("server-0").cluster().localNode().id(),
+ ((TcpDiscoveryNode) G.ignite("client-0").cluster().localNode()).clientRouterNodeId());
checkNodes(2, 1);
@@ -1206,6 +1215,528 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testReconnectAfterFail() throws Exception {
+ reconnectAfterFail(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectAfterFailTopologyChanged() throws Exception {
+ reconnectAfterFail(true);
+ }
+
+ /**
+ * @param changeTop If {@code true} topology is changed after client disconnects.
+ * @throws Exception If failed.
+ */
+ private void reconnectAfterFail(final boolean changeTop) throws Exception {
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+ Ignite client = G.ignite("client-0");
+
+ final ClusterNode clientNode = client.cluster().localNode();
+
+ final UUID clientId = clientNode.id();
+
+ final TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+ assertEquals(2L, clientNode.order());
+
+ final CountDownLatch failLatch = new CountDownLatch(1);
+
+ final CountDownLatch joinLatch = new CountDownLatch(1);
+
+ srv.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Server event: " + evt);
+
+ DiscoveryEvent evt0 = (DiscoveryEvent)evt;
+
+ if (evt0.eventNode().id().equals(clientId) && (evt.type() == EVT_NODE_FAILED)) {
+ if (evt.type() == EVT_NODE_FAILED)
+ failLatch.countDown();
+ }
+ else if (evt.type() == EVT_NODE_JOINED) {
+ TcpDiscoveryNode node = (TcpDiscoveryNode)evt0.eventNode();
+
+ if ("client-0".equals(node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME))) {
+ assertEquals(changeTop ? 5L : 4L, node.order());
+
+ joinLatch.countDown();
+ }
+ }
+
+ return true;
+ }
+ }, EVT_NODE_FAILED, EVT_NODE_JOINED);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Client event: " + evt);
+
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ assertEquals(1, reconnectLatch.getCount());
+
+ disconnectLatch.countDown();
+
+ if (changeTop)
+ clientSpi.pauseAll(false);
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ assertEquals(0, disconnectLatch.getCount());
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ if (changeTop) {
+ Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+ srvNodeIds.add(g.cluster().localNode().id());
+
+ clientSpi.resumeAll();
+ }
+
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+ assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ assertTrue(failLatch.await(5000, MILLISECONDS));
+ assertTrue(joinLatch.await(5000, MILLISECONDS));
+
+ long topVer = changeTop ? 5L : 4L;
+
+ assertEquals(topVer, client.cluster().localNode().order());
+
+ assertEquals(topVer, client.cluster().topologyVersion());
+
+ Collection<ClusterNode> clientTop = client.cluster().topology(topVer);
+
+ assertEquals(changeTop ? 3 : 2, clientTop.size());
+
+ clientNodeIds.remove(clientId);
+
+ clientNodeIds.add(client.cluster().localNode().id());
+
+ checkNodes(changeTop ? 2 : 1, 1);
+
+ Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+ srvNodeIds.add(g.cluster().localNode().id());
+
+ checkNodes(changeTop ? 3 : 2, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectAfterFailConcurrentJoin() throws Exception {
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+ Ignite client = G.ignite("client-0");
+
+ final ClusterNode clientNode = client.cluster().localNode();
+
+ assertEquals(2L, clientNode.order());
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Client event: " + evt);
+
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ assertEquals(1, reconnectLatch.getCount());
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ assertEquals(0, disconnectLatch.getCount());
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ final int CLIENTS = 20;
+
+ clientsPerSrv = CLIENTS + 1;
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ latch.await();
+
+ Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
+
+ clientNodeIds.add(g.cluster().localNode().id());
+
+ return null;
+ }
+ }, CLIENTS, "start-client");
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ latch.countDown();
+
+ assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+ assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+ clientNodeIds.add(client.cluster().localNode().id());
+
+ fut.get();
+
+ checkNodes(1, CLIENTS + 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientFailReconnectDisabled() throws Exception {
+ reconnectDisabled = true;
+
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+ Ignite client = G.ignite("client-0");
+
+ final CountDownLatch segmentedLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_NODE_SEGMENTED)
+ segmentedLatch.countDown();
+
+ return false;
+ }
+ }, EVT_NODE_SEGMENTED);
+
+ srvFailedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ log.info("Fail client node.");
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(srvFailedLatch.await(5000, MILLISECONDS));
+ assertTrue(segmentedLatch.await(5000, MILLISECONDS));
+
+ checkNodes(1, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
+ reconnectSegmentedAfterJoinTimeout(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
+ reconnectSegmentedAfterJoinTimeout(false);
+ }
+
+ /**
+ * @param failSrv If {@code true} fails server, otherwise server does not send join message.
+ * @throws Exception If failed.
+ */
+ private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception {
+ netTimeout = 4000;
+ joinTimeout = 5000;
+
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ final Ignite srv = G.ignite("server-0");
+ Ignite client = G.ignite("client-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+ TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch segmentedLatch = new CountDownLatch(1);
+ final AtomicBoolean err = new AtomicBoolean(false);
+
+ if (!failSrv) {
+ srvFailedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+ }
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ log.info("Disconnected event.");
+
+ assertEquals(1, segmentedLatch.getCount());
+ assertEquals(1, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_NODE_SEGMENTED) {
+ log.info("Segmented event.");
+
+ assertEquals(1, segmentedLatch.getCount());
+ assertEquals(0, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ segmentedLatch.countDown();
+ }
+ else {
+ log.error("Unexpected event: " + evt);
+
+ err.set(true);
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+ if (failSrv) {
+ log.info("Fail server.");
+
+ failServer(0);
+ }
+ else {
+ log.info("Fail client connection.");
+
+ srvSpi.failClientReconnect.set(1_000_000);
+ srvSpi.skipNodeAdded = true;
+
+ clientSpi.brakeConnection();
+ }
+
+ assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+ assertTrue(segmentedLatch.await(10_000, MILLISECONDS));
+
+ waitSegmented(client);
+
+ assertFalse(err.get());
+
+ if (!failSrv) {
+ await(srvFailedLatch);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return srv.cluster().nodes().size() == 1;
+ }
+ }, 10_000);
+
+ checkNodes(1, 0);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectClusterRestart() throws Exception {
+ netTimeout = 3000;
+ joinTimeout = 60_000;
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+ final AtomicBoolean err = new AtomicBoolean(false);
+
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+ Ignite client = G.ignite("client-0");
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ log.info("Disconnected event.");
+
+ assertEquals(1, reconnectLatch.getCount());
+ assertEquals(1, disconnectLatch.getCount());
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ log.info("Reconnected event.");
+
+ assertEquals(1, reconnectLatch.getCount());
+ assertEquals(0, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ reconnectLatch.countDown();
+ }
+ else {
+ log.error("Unexpected event: " + evt);
+
+ err.set(true);
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+ log.info("Stop server.");
+
+ srv.close();
+
+ assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+ srvNodeIds.clear();
+ srvIdx.set(0);
+
+ Thread.sleep(3000);
+
+ log.info("Restart server.");
+
+ startServerNodes(1);
+
+ assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+ clientNodeIds.clear();
+ clientNodeIds.add(client.cluster().localNode().id());
+
+ checkNodes(1, 1);
+
+ assertFalse(err.get());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDisconnectAfterNetworkTimeout() throws Exception {
+ netTimeout = 5000;
+ joinTimeout = 60_000;
+ maxMissedClientHbs = 2;
+
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ final Ignite srv = G.ignite("server-0");
+ Ignite client = G.ignite("client-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+ TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+ final AtomicBoolean err = new AtomicBoolean(false);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ log.info("Disconnected event.");
+
+ assertEquals(1, reconnectLatch.getCount());
+ assertEquals(1, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ log.info("Reconnected event.");
+
+ assertEquals(1, reconnectLatch.getCount());
+ assertEquals(0, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ reconnectLatch.countDown();
+ }
+ else {
+ log.error("Unexpected event: " + evt);
+
+ err.set(true);
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+ log.info("Fail client connection1.");
+
+ srvSpi.failClientReconnect.set(1_000_000);
+ srvSpi.skipNodeAdded = true;
+
+ clientSpi.brakeConnection();
+
+ assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+ log.info("Fail client connection2.");
+
+ srvSpi.failClientReconnect.set(0);
+ srvSpi.skipNodeAdded = false;
+
+ clientSpi.brakeConnection();
+
+ assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+ clientNodeIds.clear();
+
+ clientNodeIds.add(client.cluster().localNode().id());
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override
+ public boolean apply() {
+ return srv.cluster().nodes().size() == 2;
+ }
+ }, 10_000);
+
+ checkNodes(1, 1);
+
+ assertFalse(err.get());
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @throws Exception If failed.
+ */
+ private void waitSegmented(final Ignite ignite) throws Exception {
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return IgniteState.STOPPED_ON_SEGMENTATION == Ignition.state(ignite.name());
+ }
+ }, 5000);
+
+ assertEquals(IgniteState.STOPPED_ON_SEGMENTATION, Ignition.state(ignite.name()));
+ }
+
+ /**
* @param clientIdx Client index.
* @param srvIdx Server index.
* @throws Exception In case of error.
@@ -1401,7 +1932,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
private void checkRemoteNodes(Ignite ignite, int expCnt) {
Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
- assertEquals(expCnt, nodes.size());
+ assertEquals("Unexpected state for node: " + ignite.name(), expCnt, nodes.size());
for (ClusterNode node : nodes) {
UUID id = node.id();
@@ -1420,7 +1951,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @throws InterruptedException If interrupted.
*/
private void await(CountDownLatch latch) throws InterruptedException {
- assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
+ assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS));
}
/**
@@ -1471,6 +2002,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
private volatile String delayJoinAckFor;
+ /** */
+ private volatile boolean skipNodeAdded;
+
/**
* @param lock Lock.
*/
@@ -1543,6 +2077,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
boolean fail = false;
+ if (skipNodeAdded &&
+ (msg instanceof TcpDiscoveryNodeAddedMessage || msg instanceof TcpDiscoveryNodeAddFinishedMessage)) {
+ log.info("Skip message: " + msg);
+
+ return;
+ }
+
if (msg instanceof TcpDiscoveryNodeAddedMessage)
fail = failNodeAdded.getAndDecrement() > 0;
else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
@@ -1577,12 +2118,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
- *
+ * @param suspend If {@code true} suspends worker threads.
*/
- public void pauseAll() {
+ public void pauseAll(boolean suspend) {
pauseResumeOperation(true, openSockLock, writeLock);
- impl.workerThread().suspend();
+ if (suspend)
+ impl.workerThread().suspend();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
index 159c451..dacbf55 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
@@ -317,4 +317,9 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
@Override public ClusterMetrics metrics() throws IgniteException {
throw new UnsupportedOperationException("Operation is not supported yet.");
}
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteFuture<?> clientReconnectFuture() {
+ throw new UnsupportedOperationException("Operation is not supported yet.");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
new file mode 100644
index 0000000..66c9835
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.*;
+import org.apache.ignite.internal.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception In case of error.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Ignite Client Reconnect Test Suite");
+
+ suite.addTestSuite(IgniteClientReconnectStopTest.class);
+ suite.addTestSuite(IgniteClientReconnectApiExceptionTest.class);
+ suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class);
+ suite.addTestSuite(IgniteClientReconnectCacheTest.class);
+ suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class);
+ suite.addTestSuite(IgniteClientReconnectComputeTest.class);
+ suite.addTestSuite(IgniteClientReconnectAtomicsTest.class);
+ suite.addTestSuite(IgniteClientReconnectCollectionsTest.class);
+ suite.addTestSuite(IgniteClientReconnectServicesTest.class);
+ suite.addTestSuite(IgniteClientReconnectStreamerTest.class);
+ suite.addTestSuite(IgniteClientReconnectFailoverTest.class);
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 06c0961..c76dbe7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1439,6 +1439,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fut.get();
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ rdcQryExec.onDisconnected(reconnectFut);
+ }
+
/**
* Wrapper to store connection and flag is schema set or not.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index af29647..2b2996d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -26,6 +26,7 @@ import org.h2.table.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
+import javax.cache.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -40,7 +41,7 @@ public abstract class GridMergeIndex extends BaseIndex {
private static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000);
/** All rows number. */
- private final AtomicInteger expectedRowsCnt = new AtomicInteger(0);
+ private final AtomicInteger expRowsCnt = new AtomicInteger(0);
/** Remaining rows per source node ID. */
private final ConcurrentMap<UUID, Counter> remainingRows = new ConcurrentHashMap8<>();
@@ -75,8 +76,8 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public long getRowCount(Session session) {
- return expectedRowsCnt.get();
+ @Override public long getRowCount(Session ses) {
+ return expRowsCnt.get();
}
/** {@inheritDoc} */
@@ -93,6 +94,23 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
+ * @param e Error.
+ */
+ public void fail(final CacheException e) {
+ for (UUID nodeId0 : remainingRows.keySet()) {
+ addPage0(new GridResultPage(null, nodeId0, null) {
+ @Override public boolean isFail() {
+ return true;
+ }
+
+ @Override public void fetchNextPage() {
+ throw e;
+ }
+ });
+ }
+ }
+
+ /**
* @param nodeId Node ID.
*/
public void fail(UUID nodeId) {
@@ -120,7 +138,7 @@ public abstract class GridMergeIndex extends BaseIndex {
assert !cnt.initialized : "Counter is already initialized.";
cnt.addAndGet(allRows);
- expectedRowsCnt.addAndGet(allRows);
+ expRowsCnt.addAndGet(allRows);
// We need this separate flag to handle case when the first source contains only one page
// and it will signal that all remaining counters are zero and fetch is finished.
@@ -162,7 +180,7 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
+ @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
if (fetched == null)
throw new IgniteException("Fetched result set was too large.");
@@ -176,7 +194,7 @@ public abstract class GridMergeIndex extends BaseIndex {
* @return {@code true} If we have fetched all the remote rows.
*/
public boolean fetchedAll() {
- return fetchedCnt == expectedRowsCnt.get();
+ return fetchedCnt == expRowsCnt.get();
}
/**
@@ -200,32 +218,32 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public void close(Session session) {
+ @Override public void close(Session ses) {
// No-op.
}
/** {@inheritDoc} */
- @Override public void add(Session session, Row row) {
+ @Override public void add(Session ses, Row row) {
throw DbException.getUnsupportedException("add");
}
/** {@inheritDoc} */
- @Override public void remove(Session session, Row row) {
+ @Override public void remove(Session ses, Row row) {
throw DbException.getUnsupportedException("remove row");
}
/** {@inheritDoc} */
- @Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
+ @Override public double getCost(Session ses, int[] masks, TableFilter filter, SortOrder sortOrder) {
return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
}
/** {@inheritDoc} */
- @Override public void remove(Session session) {
+ @Override public void remove(Session ses) {
throw DbException.getUnsupportedException("remove index");
}
/** {@inheritDoc} */
- @Override public void truncate(Session session) {
+ @Override public void truncate(Session ses) {
throw DbException.getUnsupportedException("truncate");
}
@@ -235,7 +253,7 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public Cursor findFirstOrLast(Session session, boolean first) {
+ @Override public Cursor findFirstOrLast(Session ses, boolean first) {
throw DbException.getUnsupportedException("findFirstOrLast");
}
@@ -299,6 +317,7 @@ public abstract class GridMergeIndex extends BaseIndex {
private Iterator<Row> stream;
/**
+ * @param stream Iterator.
*/
public FetchingCursor(Iterator<Row> stream) {
super(new FetchedIterator());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 32d1c95..cde3288 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.h2.command.*;
@@ -47,6 +48,7 @@ import org.h2.table.*;
import org.h2.tools.*;
import org.h2.util.*;
import org.h2.value.*;
+import org.jetbrains.annotations.*;
import org.jsr166.*;
import javax.cache.*;
@@ -234,10 +236,15 @@ public class GridReduceQueryExecutor {
Object errState = r.state.get();
if (errState != null) {
+ CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null;
+
+ if (err0 != null && err0.getCause() instanceof IgniteClientDisconnectedException)
+ throw err0;
+
CacheException e = new CacheException("Failed to fetch data from node: " + node.id());
- if (errState instanceof CacheException)
- e.addSuppressed((Throwable)errState);
+ if (err0 != null)
+ e.addSuppressed(err0);
throw e;
}
@@ -301,6 +308,7 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param cctx Cache context.
* @return {@code true} If cache context
*/
private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
@@ -481,6 +489,12 @@ public class GridReduceQueryExecutor {
runs.put(qryReqId, r);
try {
+ if (ctx.clientDisconnected()) {
+ throw new CacheException("Query was cancelled, client node disconnected.",
+ new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(),
+ "Client node disconnected."));
+ }
+
Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
if (qry.explain()) {
@@ -506,8 +520,14 @@ public class GridReduceQueryExecutor {
Object state = r.state.get();
if (state != null) {
- if (state instanceof CacheException)
- throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+ if (state instanceof CacheException) {
+ CacheException err = (CacheException)state;
+
+ if (err.getCause() instanceof IgniteClientDisconnectedException)
+ throw err;
+
+ throw new CacheException("Failed to run map query remotely.", err);
+ }
if (state instanceof AffinityTopologyVersion) {
retry = true;
@@ -550,7 +570,20 @@ public class GridReduceQueryExecutor {
catch (IgniteCheckedException | RuntimeException e) {
U.closeQuiet(r.conn);
- throw new CacheException("Failed to run reduce query locally.", e);
+ if (e instanceof CacheException)
+ throw (CacheException)e;
+
+ Throwable cause = e;
+
+ if (e instanceof IgniteCheckedException) {
+ Throwable disconnectedErr =
+ ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class);
+
+ if (disconnectedErr != null)
+ cause = disconnectedErr;
+ }
+
+ throw new CacheException("Failed to run reduce query locally.", cause);
}
finally {
if (!runs.remove(qryReqId, r))
@@ -1082,6 +1115,17 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ CacheException err = new CacheException("Query was cancelled, client node disconnected.",
+ new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."));
+
+ for (Map.Entry<Long, QueryRun> e : runs.entrySet())
+ e.getValue().disconnected(err);
+ }
+
+ /**
*
*/
private static class QueryRun {
@@ -1104,7 +1148,7 @@ public class GridReduceQueryExecutor {
* @param o Fail state object.
* @param nodeId Node ID.
*/
- void state(Object o, UUID nodeId) {
+ void state(Object o, @Nullable UUID nodeId) {
assert o != null;
assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
@@ -1117,6 +1161,20 @@ public class GridReduceQueryExecutor {
for (GridMergeTable tbl : tbls) // Fail all merge indexes.
tbl.getScanIndex(null).fail(nodeId);
}
+
+ /**
+ * @param e Error.
+ */
+ void disconnected(CacheException e) {
+ if (!state.compareAndSet(null, e))
+ return;
+
+ while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
+ latch.countDown();
+
+ for (GridMergeTable tbl : tbls) // Fail all merge indexes.
+ tbl.getScanIndex(null).fail(e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
new file mode 100644
index 0000000..23320ae
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectCacheQueriesFailoverTest extends IgniteClientReconnectFailoverAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setIndexedTypes(Integer.class, Person.class);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ final IgniteCache<Integer, Person> cache = grid(serverCount()).cache(null);
+
+ assertNotNull(cache);
+
+ for (int i = 0; i <= 10_000; i++)
+ cache.put(i, new Person(i, "name-" + i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectCacheQueries() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ final IgniteCache<Integer, Person> cache = client.cache(null);
+
+ assertNotNull(cache);
+
+ reconnectFailover(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ SqlQuery<Integer, Person> sqlQry = new SqlQuery<>(Person.class, "where id > 1");
+
+ try {
+ assertEquals(9999, cache.query(sqlQry).getAll().size());
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof IgniteClientDisconnectedException)
+ throw e;
+ else
+ log.info("Ignore error: " + e);
+ }
+
+ try {
+ SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select avg(p.id) from Person p");
+
+ List<List<?>> res = cache.query(fieldsQry).getAll();
+
+ assertEquals(1, res.size());
+
+ Double avg = (Double)res.get(0).get(0);
+
+ assertEquals(5_000, avg.intValue());
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof IgniteClientDisconnectedException)
+ throw e;
+ else
+ log.info("Ignore error: " + e);
+ }
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectScanQuery() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ final IgniteCache<Integer, Person> cache = client.cache(null);
+
+ assertNotNull(cache);
+
+ final Affinity<Integer> aff = client.affinity(null);
+
+ final Map<Integer, Integer> partMap = new HashMap<>();
+
+ for (int i = 0; i < aff.partitions(); i++)
+ partMap.put(i, 0);
+
+ for (int i = 0; i <= 10_000; i++) {
+ Integer part = aff.partition(i);
+
+ Integer size = partMap.get(part);
+
+ partMap.put(part, size + 1);
+ }
+
+ reconnectFailover(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ScanQuery<Integer, Person> qry = new ScanQuery<>(new IgniteBiPredicate<Integer, Person>() {
+ @Override public boolean apply(Integer key, Person val) {
+ return val.getId() % 2 == 1;
+ }
+ });
+
+ assertEquals(5000, cache.query(qry).getAll().size());
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ Integer part = rnd.nextInt(0, aff.partitions());
+
+ qry = new ScanQuery<>(part);
+
+ assertEquals((int)partMap.get(part), cache.query(qry).getAll().size());
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ *
+ */
+ public static class Person {
+ /** */
+ @QuerySqlField
+ public int id;
+
+ /** */
+ @QuerySqlField
+ public String name;
+
+ /**
+ * @param id Id.
+ * @param name Name.
+ */
+ public Person(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ /**
+ * @return Id.
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * @param id Set id.
+ */
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ /**
+ * @return Name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @param name Name.
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Person.class, this);
+ }
+ }
+}