You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/07 16:47:39 UTC
[2/3] incubator-ignite git commit: # ignite-901
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
new file mode 100644
index 0000000..dae1182
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
@@ -0,0 +1,845 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import javax.cache.processor.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testErrorOnDisconnect() throws Exception {
+ // Check cache operations.
+ cacheOperationsTest();
+
+ // Check cache operations.
+ beforeTestsStarted();
+ dataStructureOperationsTest();
+
+ // Check ignite operations.
+ beforeTestsStarted();
+ igniteOperationsTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void dataStructureOperationsTest() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ doTestIgniteOperationOnDisconnect(client, Arrays.asList(
+ // Check atomic long.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.atomicLong("testAtomic", 41, true);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.atomicLong("testAtomic", 41, true);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteAtomicLong atomicLong = (IgniteAtomicLong)o;
+
+ assertEquals(42, atomicLong.incrementAndGet());
+
+ return true;
+ }
+ }
+ ),
+ // Check set.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.set("testSet", new CollectionConfiguration());
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.set("testSet", new CollectionConfiguration());
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteSet set = (IgniteSet)o;
+
+ String val = "testVal";
+
+ set.add(val);
+
+ assertEquals(1, set.size());
+ assertTrue(set.contains(val));
+
+ return true;
+ }
+ }
+ ),
+ // Check ignite queue.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.queue("TestQueue", 10, new CollectionConfiguration());
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.queue("TestQueue", 10, new CollectionConfiguration());
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteQueue queue = (IgniteQueue)o;
+
+ String val = "Test";
+
+ queue.add(val);
+
+ assertEquals(val, queue.poll());
+
+ return true;
+ }
+ }
+ )
+ ));
+
+ clientMode = false;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void cacheOperationsTest() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ final IgniteCache<Object, Object> defaultCache = client.cache(null);
+
+ assertNotNull(defaultCache);
+
+ doTestIgniteOperationOnDisconnect(client, Arrays.asList(
+ // Check put and get operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ defaultCache.getAndPut(9999, 9999);
+ }
+ catch (CacheException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return defaultCache.getAndPut(9999, 9999);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNull(o);
+
+ assertEquals(9999, defaultCache.get(9999));
+
+ return true;
+ }
+ }
+ ),
+ // Check put operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ defaultCache.put(10000, 10000);
+ }
+ catch (CacheException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ defaultCache.put(10000, 10000);
+
+ return true;
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertTrue((Boolean)o);
+
+ assertEquals(10000, defaultCache.get(10000));
+
+ return true;
+ }
+ }
+ ),
+ // Check get operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ defaultCache.get(10001);
+ }
+ catch (CacheException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return defaultCache.get(10001);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNull(o);
+
+ return true;
+ }
+ }
+ ),
+ // Check invoke operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ defaultCache.invoke(10000, new CacheEntryProcessor<Object, Object, Object>() {
+ @Override public Object process(MutableEntry<Object, Object> entry,
+ Object... arguments) throws EntryProcessorException {
+ assertTrue(entry.exists());
+
+ return (int)entry.getValue() * 2;
+ }
+ });
+ }
+ catch (CacheException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return defaultCache.invoke(10000, new CacheEntryProcessor<Object, Object, Object>() {
+ @Override public Object process(MutableEntry<Object, Object> entry,
+ Object... arguments) throws EntryProcessorException {
+ assertTrue(entry.exists());
+
+ return (int)entry.getValue() * 2;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ assertEquals(20000, (int)o);
+
+ return true;
+ }
+ }
+ ),
+ // Check put async operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ IgniteCache<Object, Object> async = defaultCache.withAsync();
+
+ boolean failed = false;
+
+ try {
+ async.put(10002, 10002);
+
+ async.future().get();
+ }
+ catch (CacheException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ async.put(10002, 10002);
+
+ return async.future().get();
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNull(o);
+
+ assertEquals(10002, defaultCache.get(10002));
+
+ return true;
+ }
+ }
+ ),
+ // Check transaction.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.transactions();
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.transactions();
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteTransactions txs = (IgniteTransactions)o;
+
+ assertNotNull(txs);
+
+ return true;
+ }
+ }
+ ),
+ // Check get cache.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.cache(null);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.cache(null);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)o;
+
+ assertNotNull(cache0);
+
+ cache0.put(1, 1);
+
+ assertEquals(1, cache0.get(1));
+
+ return true;
+ }
+ }
+ ),
+ // Check streamer.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.dataStreamer(null);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.dataStreamer(null);
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)o;
+
+ streamer.addData(2, 2);
+
+ streamer.close();
+
+ assertEquals(2, client.cache(null).get(2));
+
+ return true;
+ }
+ }
+ ),
+ // Check create cache.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.createCache("test_cache");
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.createCache("test_cache");
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteCache<Object, Object> cache = (IgniteCache<Object, Object>)o;
+
+ assertNotNull(cache);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+
+ return true;
+ }
+ }
+ )
+
+ ));
+
+ clientMode = false;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void igniteOperationsTest() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(serverCount());
+
+ final IgniteCache<Object, Object> dfltCache = client.cache(null);
+
+ final CountDownLatch recvLatch = new CountDownLatch(1);
+
+ assertNotNull(dfltCache);
+
+ doTestIgniteOperationOnDisconnect(client, Arrays.asList(
+ // Check compute.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.compute();
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.compute();
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ IgniteCompute comp = (IgniteCompute)o;
+
+ Collection<UUID> uuids = comp.broadcast(new IgniteCallable<UUID>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public UUID call() throws Exception {
+ return ignite.cluster().localNode().id();
+ }
+ });
+
+ assertFalse(uuids.isEmpty());
+
+ for (UUID uuid : uuids)
+ assertNotNull(uuid);
+
+ return true;
+ }
+ }
+ ),
+
+ // Check ping node.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.cluster().pingNode(new UUID(0, 0));
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.cluster().pingNode(new UUID(0, 0));
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ Boolean pingNode = (Boolean)o;
+
+ assertFalse(pingNode);
+
+ return true;
+ }
+ }
+ ),
+ // Check register remote listener.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.events().remoteListen(null, new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event event) {
+ return true;
+ }
+ });
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.events().remoteListen(null, new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event event) {
+ return true;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ UUID remoteId = (UUID)o;
+
+ assertNotNull(remoteId);
+
+ client.events().stopRemoteListen(remoteId);
+
+ return true;
+ }
+ }
+ ),
+ // Check message operation.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() {
+ @Override public boolean apply(UUID uuid, Object o) {
+ if (o.equals("Test message."))
+ recvLatch.countDown();
+
+ return true;
+ }
+ });
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() {
+ @Override public boolean apply(UUID uuid, Object o) {
+ if (o.equals("Test message."))
+ recvLatch.countDown();
+
+ return true;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ IgniteMessaging msg = client.message();
+
+ msg.send(null, "Test message.");
+
+ try {
+ assertTrue(recvLatch.await(2, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e) {
+ fail("Message wasn't received.");
+ }
+
+ return true;
+ }
+ }
+ ),
+ // Check executor.
+ new T2<Callable, C1<Object, Boolean>>(
+ new Callable() {
+ @Override public Object call() throws Exception {
+ boolean failed = false;
+
+ try {
+ client.executorService().submit(new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ return 42;
+ }
+ });
+ }
+ catch (IgniteClientDisconnectedException e) {
+ failed = true;
+
+ checkAndWait(e);
+ }
+
+ assertTrue(failed);
+
+ return client.executorService().submit(new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ return 42;
+ }
+ });
+ }
+ },
+ new C1<Object, Boolean>() {
+ @Override public Boolean apply(Object o) {
+ assertNotNull(o);
+
+ Future<Integer> fut = (Future<Integer>)o;
+
+ try {
+ assertEquals(42, (int)fut.get());
+ }
+ catch (Exception e) {
+ fail("Failed submit task.");
+ }
+
+ return true;
+ }
+ }
+ )
+ ));
+
+ clientMode = false;
+ }
+
+ /**
+ * @param client Client.
+ * @param ops Operations closures.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void doTestIgniteOperationOnDisconnect(Ignite client, final List<T2<Callable, C1<Object, Boolean>>> ops)
+ throws Exception {
+ assertNotNull(client.cache(null));
+
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ final List<IgniteInternalFuture> futs = new ArrayList<>();
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ assertEquals(1, reconnectLatch.getCount());
+
+ for (T2<Callable, C1<Object, Boolean>> op : ops)
+ futs.add(GridTestUtils.runAsync(op.get1()));
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ try {
+ log.info("Fail client.");
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ assertEquals(ops.size(), futs.size());
+
+ for (IgniteInternalFuture<?> fut : futs)
+ assertNotDone(fut);
+
+ U.sleep(2000);
+
+ for (IgniteInternalFuture<?> fut : futs)
+ assertNotDone(fut);
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ // Check operation after reconnect working.
+ for (int i = 0; i < futs.size(); i++) {
+ final int i0 = i;
+
+ try {
+ final Object furRes = futs.get(i0).get(2, TimeUnit.SECONDS);
+
+ assertTrue(GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ return ops.get(i0).get2().apply(furRes);
+ }
+ }).get(2, TimeUnit.SECONDS));
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ e.printStackTrace();
+
+ fail("Operation timeout. Iteration: " + i + ".");
+ }
+ }
+ }
+ finally {
+ clientSpi.writeLatch.countDown();
+
+ for (IgniteInternalFuture fut : futs)
+ fut.cancel();
+
+ stopAllGrids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index d9afe33..bb568ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -138,7 +138,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
srvAtomicSeq.batchSize(1);
- commSpi.blockMsg(GridNearLockResponse.class);
+ commSpi.blockMessage(GridNearLockResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -166,7 +166,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertNotDone(fut);
- commSpi.unblockMsg();
+ commSpi.unblockMessage();
reconnectClientNode(client, srv, null);
@@ -298,7 +298,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
BlockTpcCommunicationSpi servCommSpi = commSpi(srv);
- servCommSpi.blockMsg(GridNearLockResponse.class);
+ servCommSpi.blockMessage(GridNearLockResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -324,7 +324,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertNotDone(fut);
- servCommSpi.unblockMsg();
+ servCommSpi.unblockMessage();
reconnectClientNode(client, srv, null);
@@ -458,7 +458,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
BlockTpcCommunicationSpi servCommSpi = commSpi(srv);
- servCommSpi.blockMsg(GridNearLockResponse.class);
+ servCommSpi.blockMessage(GridNearLockResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -484,7 +484,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertNotDone(fut);
- servCommSpi.unblockMsg();
+ servCommSpi.unblockMessage();
reconnectClientNode(client, srv, null);
@@ -590,7 +590,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongInProggress", 0, false);
- commSpi.blockMsg(GridNearLockResponse.class);
+ commSpi.blockMessage(GridNearLockResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -616,7 +616,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertNotDone(fut);
- commSpi.unblockMsg();
+ commSpi.unblockMessage();
reconnectClientNode(client, srv, null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index fdce8cb..802277c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -135,6 +135,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setName("nearCache");
final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>());
@@ -242,6 +243,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
assertEquals(10, staticCache.get(10));
+ nearCache.put(20, 20);
+
+ srv.cache(nearCache.getName()).put(20, 21);
+
+ assertEquals(21, nearCache.localPeek(20));
+
this.clientMode = false;
IgniteEx srv2 = startGrid(SRV_CNT + 1);
@@ -261,6 +268,10 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
staticCache.put(20, 20);
assertEquals(20, staticCache.get(20));
+
+ srv.cache(nearCache.getName()).put(20, 22);
+
+ assertEquals(22, nearCache.localPeek(20));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index bba9655..ed811d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -260,9 +260,9 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
BlockTpcCommunicationSpi commSpi = commSpi(srv);
if (colCfg.getAtomicityMode() == ATOMIC)
- commSpi.blockMsg(GridNearAtomicUpdateResponse.class);
+ commSpi.blockMessage(GridNearAtomicUpdateResponse.class);
else
- commSpi.blockMsg(GridNearTxPrepareResponse.class);
+ commSpi.blockMessage(GridNearTxPrepareResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -289,7 +289,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
assertNotDone(fut);
- commSpi.unblockMsg();
+ commSpi.unblockMessage();
reconnectClientNode(client, srv, null);
@@ -402,9 +402,9 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
BlockTpcCommunicationSpi commSpi = commSpi(srv);
if (colCfg.getAtomicityMode() == ATOMIC)
- commSpi.blockMsg(GridNearAtomicUpdateResponse.class);
+ commSpi.blockMessage(GridNearAtomicUpdateResponse.class);
else
- commSpi.blockMsg(GridNearTxPrepareResponse.class);
+ commSpi.blockMessage(GridNearTxPrepareResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -430,7 +430,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
assertNotDone(fut);
- commSpi.unblockMsg();
+ commSpi.unblockMessage();
reconnectClientNode(client, srv, null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
index cff3b5c..e9667a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
@@ -41,7 +41,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
/**
* @throws Exception If failed.
*/
- public void testReconnectAffCallInProgress() throws Exception {
+ public void testReconnectAffinityCallInProgress() throws Exception {
final Ignite client = grid(serverCount());
assertTrue(client.cluster().localNode().isClient());
@@ -55,7 +55,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
BlockTpcCommunicationSpi commSpi = commSpi(srv);
- commSpi.blockMsg(GridJobExecuteResponse.class);
+ commSpi.blockMessage(GridJobExecuteResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -85,7 +85,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
assertNotDone(fut);
- commSpi.unblockMsg();
+ commSpi.unblockMessage();
reconnectClientNode(client, srv, null);
@@ -104,7 +104,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
BlockTpcCommunicationSpi commSpi = commSpi(srv);
- commSpi.blockMsg(GridJobExecuteResponse.class);
+ commSpi.blockMessage(GridJobExecuteResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -134,7 +134,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
assertNotDone(fut);
- commSpi.unblockMsg();
+ commSpi.unblockMessage();
reconnectClientNode(client, srv, null);
@@ -153,7 +153,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
BlockTpcCommunicationSpi commSpi = commSpi(srv);
- commSpi.blockMsg(GridJobExecuteResponse.class);
+ commSpi.blockMessage(GridJobExecuteResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -183,7 +183,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
assertNotDone(fut);
- commSpi.unblockMsg();
+ commSpi.unblockMessage();
reconnectClientNode(client, srv, null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index bf0130b..4a7e3c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -42,7 +42,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
/** {@inheritDoc} */
@Override protected int serverCount() {
- return 1;
+ return 3;
}
/** {@inheritDoc} */
@@ -182,23 +182,53 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
-
- TestTcpDiscoverySpi srvSpi = spi(srv);
-
IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
CacheEventListener lsnr = new CacheEventListener();
ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+ qry.setAutoUnsubscribe(true);
+
qry.setLocalListener(lsnr);
QueryCursor<?> cur = clientCache.query(qry);
+ for (int i = 0; i < 5; i++) {
+ log.info("Iteration: " + i);
+
+ continuousQueryReconnect(client, clientCache, lsnr);
+ }
+
+ log.info("Close cursor, should not get cache events anymore.");
+
+ cur.close();
+
+ lsnr.latch = new CountDownLatch(1);
+
+ clientCache.put(3, 3);
+
+ assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+ }
+
+ /**
+ * @param client Client.
+ * @param clientCache Client cache.
+ * @param lsnr Continuous query listener.
+ * @throws Exception If failed.
+ */
+ private void continuousQueryReconnect(Ignite client,
+ IgniteCache<Object, Object> clientCache,
+ CacheEventListener lsnr)
+ throws Exception
+ {
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
final CountDownLatch reconnectLatch = new CountDownLatch(1);
- client.events().localListen(new IgnitePredicate<Event>() {
+ IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
info("Reconnected: " + evt);
@@ -208,11 +238,15 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
return true;
}
- }, EVT_CLIENT_NODE_RECONNECTED);
+ };
+
+ client.events().localListen(p, EVT_CLIENT_NODE_RECONNECTED);
srvSpi.failNode(client.cluster().localNode().id(), null);
- assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+ assertTrue(reconnectLatch.await(50000000, MILLISECONDS));
+
+ client.events().stopLocalListen(p);
lsnr.latch = new CountDownLatch(1);
@@ -225,16 +259,6 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
srv.cache(null).put(2, 2);
assertTrue(lsnr.latch.await(5000, MILLISECONDS));
-
- log.info("Close cursor, should not get cache events anymore.");
-
- cur.close();
-
- lsnr.latch = new CountDownLatch(1);
-
- clientCache.put(3, 3);
-
- assertFalse(lsnr.latch.await(3000, MILLISECONDS));
}
// TODO IGNITE-901 test operations in progress are cancelled.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
index e51d68a..35f86f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
@@ -18,23 +18,22 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.transactions.*;
import javax.cache.*;
-import javax.cache.processor.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
/**
*
@@ -44,233 +43,274 @@ public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbst
public final Integer THREADS = 8;
/** */
- public final Integer RESTART_CNT = 30;
+ private volatile CyclicBarrier barrier;
+
+ /** */
+ private static final String ATOMIC_CACHE = "ATOMIC_CACHE";
+
+ /** */
+ private static final String TX_CACHE = "TX_CACHE";
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- cfg.setCacheConfiguration(new CacheConfiguration());
+ CacheConfiguration ccfg1 = new CacheConfiguration();
+
+ ccfg1.setName(ATOMIC_CACHE);
+ ccfg1.setBackups(1);
+ ccfg1.setAtomicityMode(ATOMIC);
+
+ CacheConfiguration ccfg2 = new CacheConfiguration();
+
+ ccfg2.setName(TX_CACHE);
+ ccfg2.setBackups(1);
+ ccfg2.setAtomicityMode(TRANSACTIONAL);
+
+ cfg.setCacheConfiguration(ccfg1, ccfg2);
return cfg;
}
/** {@inheritDoc} */
@Override protected int serverCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
return 1;
}
- /** */
- private volatile CyclicBarrier barrier;
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 2 * 60_000;
+ }
/**
* @throws Exception If failed.
*/
- public void testCacheOperationReconnectApi() throws Exception {
- clientMode = true;
+ public void testReconnectAtomicCache() throws Exception {
+ final Ignite client = grid(serverCount());
- final Ignite client = startGrid(serverCount());
+ final IgniteCache<Integer, Integer> cache = client.cache(ATOMIC_CACHE);
- assertNotNull(client.cache(null));
+ assertNotNull(cache);
- Ignite srv = clientRouter(client);
+ assertEquals(ATOMIC, cache.getConfiguration(CacheConfiguration.class).getAtomicityMode());
- TestTcpDiscoverySpi srvSpi = spi(srv);
+ reconnectFailover(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ TreeMap<Integer, Integer> map = new TreeMap<>();
- final AtomicBoolean stop = new AtomicBoolean(false);
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- final AtomicLong cntr = new AtomicLong();
+ for (int i = 0; i < 10; i++) {
+ Integer key = rnd.nextInt(0, 100_000);
- final IgniteQueue<Object> queue = client.queue("test-queue", 1000, new CollectionConfiguration());
+ cache.put(key, key);
- final IgniteAtomicLong atomicLong = client.atomicLong("counter", 0, true);
+ assertEquals(key, cache.get(key));
- final IgniteAtomicSequence sequence = client.atomicSequence("sequence", 0, true);
+ map.put(key, key);
+ }
- final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- IgniteCache<Integer, Integer> cache = client.cache(null);
+ cache.putAll(map);
- IgniteCompute compute = client.compute();
+ Map<Integer, Integer> res = cache.getAll(map.keySet());
- Set<Integer> keys = new TreeSet<>();
- final Map<Integer, Integer> entries = new TreeMap<>();
+ assertEquals(map, res);
- for (int i = 0; i < 50; i++) {
- keys.add(i);
- entries.put(i, i);
- }
+ return null;
+ }
+ });
+ }
- while (!stop.get()) {
- cntr.incrementAndGet();
-
- try {
- // Start cache operations.
- for (int i = 0; i < 10; i++) {
- cache.put(i, i);
- cache.get(i);
- cache.remove(i);
-
- cache.putAll(entries);
-
- cache.invokeAll(keys, new CacheEntryProcessor<Integer, Integer, Object>() {
- @Override public Object process(MutableEntry<Integer, Integer> entry,
- Object... arguments) throws EntryProcessorException {
- if (ThreadLocalRandom.current().nextBoolean())
- entry.setValue(entry.getValue() * 100);
- else
- entry.remove();
-
- return entry;
- }
- });
- }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectTxCache() throws Exception {
+ final Ignite client = grid(serverCount());
- try (Transaction tx = client.transactions().txStart()) {
- for (int i = 0; i < 10; i++) {
- cache.put(i, i);
- cache.get(i);
- }
+ final IgniteCache<Integer, Integer> cache = client.cache(TX_CACHE);
- tx.commit();
- }
+ assertNotNull(cache);
+
+ assertEquals(TRANSACTIONAL, cache.getConfiguration(CacheConfiguration.class).getAtomicityMode());
+
+ final IgniteTransactions txs = client.transactions();
- // Start async cache operations.
- IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+ reconnectFailover(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ TreeMap<Integer, Integer> map = new TreeMap<>();
- for (int i = 0; i < 10; i++) {
- asyncCache.put(i, i);
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- asyncCache.future().get();
+ for (int i = 0; i < 5; i++) {
+ Integer key = rnd.nextInt(0, 100_000);
- asyncCache.get(i);
+ cache.put(key, key);
- asyncCache.future().get();
+ assertEquals(key, cache.get(key));
+
+ map.put(key, key);
+ }
+
+ for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) {
+ try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+ for (Map.Entry<Integer, Integer> e : map.entrySet()) {
+ cache.put(e.getKey(), e.getValue());
+
+ assertNotNull(cache.get(e.getKey()));
}
- // Compute.
-// for (int i = 0; i < 10; i++) {
-// compute.broadcast(new IgniteCallable<Integer>() {
-// @IgniteInstanceResource
-// private Ignite ignite;
-//
-// @Override public Integer call() throws Exception {
-// return ignite.cache(null).localSize();
-// }
-// });
-//
-// compute.broadcast(new IgniteRunnable() {
-// @Override public void run() {
-// // No-op.
-// }
-// });
-//
-// compute.apply(new C1<String, String>() {
-// @Override public String apply(String o) {
-// return o.toUpperCase();
-// }
-// }, Arrays.asList("a", "b", "c"));
-// }
-
- //Data structures.
-// for (int i = 0; i < 10; i++) {
-// assert atomicLong.incrementAndGet() >= 0;
-//
-// queue.offer("Test item");
-//
-// if (ThreadLocalRandom.current().nextBoolean())
-// for (int j = 0; j < 50; j++)
-// queue.poll();
-//
-// assert queue.size() <= 1000;
-//
-// assert sequence.addAndGet(i + 1) >= 0;
-// }
- }
- catch (CacheException | IgniteException e) {
- log.info("Operation failed, ignore: " + e);
+ tx.commit();
}
+ }
+
+ cache.putAll(map);
+
+ Map<Integer, Integer> res = cache.getAll(map.keySet());
+
+ assertEquals(map, res);
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectComputeApi() throws Exception {
+ final Ignite client = grid(serverCount());
+
+ final IgniteCompute comp = client.compute();
- if (cntr.get() % 100 == 0)
- log.info("Iteration: " + cntr);
+ reconnectFailover(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ comp.call(new DummyClosure());
- if (barrier != null)
+ comp.broadcast(new DummyClosure());
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * @param c Test closure.
+ * @throws Exception If failed.
+ */
+ public void reconnectFailover(final Callable<Void> c) throws Exception {
+ final Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ Ignite srv = clientRouter(client);
+
+ TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final AtomicBoolean stop = new AtomicBoolean(false);
+
+ final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ int iter = 0;
+
+ while (!stop.get()) {
try {
- barrier.await();
+ c.call();
}
- catch (BrokenBarrierException e) {
- log.warning("Broken barrier.", e);
-
- break;
+ catch (CacheException e) {
+ checkAndWait(e);
+ }
+ catch (IgniteClientDisconnectedException e) {
+ checkAndWait(e);
}
+
+ if (++iter % 100 == 0)
+ log.info("Iteration: " + iter);
+
+ if (barrier != null)
+ barrier.await();
+ }
+
+ return null;
}
+ catch (Throwable e) {
+ stop.set(true);
- return null;
- }
- }, THREADS, "test-operation-thread-" + client.name());
+ log.error("Unexpected error: " + e, e);
- final AtomicBoolean disconnected = new AtomicBoolean(false);
+ throw e;
+ }
+ }
+ }, THREADS, "test-operation-thread");
- final AtomicBoolean reconnected = new AtomicBoolean(false);
+ final AtomicReference<CountDownLatch> reconnected = new AtomicReference<>();
client.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
-
- if (!reconnected.get())
- disconnected.set(true);
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
info("Reconnected: " + evt);
- if (disconnected.get())
- reconnected.set(true);
+ CountDownLatch latch = reconnected.get();
+
+ assertNotNull(latch);
+ assertEquals(1, latch.getCount());
+
+ latch.countDown();
}
+ else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
+ info("Disconnected: " + evt);
return true;
}
}, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
- for (int i = 0; i < RESTART_CNT; i++) {
- U.sleep(2000);
+ try {
+ long stopTime = System.currentTimeMillis() + 60_000;
- log.info("Block reconnect.");
+ String err = null;
- reconnected.set(false);
+ while (System.currentTimeMillis() < stopTime && !fut.isDone()) {
+ U.sleep(100);
- disconnected.set(false);
+ CountDownLatch latch = new CountDownLatch(1);
- log.info("Fail client.");
+ reconnected.set(latch);
- srvSpi.failNode(client.cluster().localNode().id(), null);
+ UUID nodeId = client.cluster().localNode().id();
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return disconnected.get();
- }
- }, 5000L);
+ log.info("Fail client: " + nodeId);
- barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
- @Override public void run() {
- barrier = null;
+ srvSpi.failNode(nodeId, null);
+
+ if (!latch.await(5000, MILLISECONDS)) {
+ err = "Failed to wait for reconnect";
+
+ break;
}
- });
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return reconnected.get();
+ barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
+ @Override public void run() {
+ barrier = null;
+ }
+ });
+
+ try {
+ barrier.await(10, SECONDS);
}
- }, 5000L);
+ catch (TimeoutException e) {
+ err = "Operation hangs.";
- try {
- barrier.await(10, TimeUnit.SECONDS);
+ break;
+ }
}
- catch (TimeoutException e) {
- log.error("Failed. Operation hangs.");
- for (Ignite ignite : G.allGrids())
- dumpCacheDebugInfo(ignite);
+ if (err != null) {
+ log.error(err);
U.dumpThreads(log);
@@ -281,10 +321,25 @@ public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbst
stop.set(true);
- fail("Failed to wait for update.");
+ fail(err);
}
+
+ stop.set(true);
+
+ fut.get();
+ }
+ finally {
+ stop.set(true);
}
+ }
- stop.set(true);
+ /**
+ *
+ */
+ public static class DummyClosure implements IgniteCallable<Object> {
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ return 1;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
index 6ccbbe0..31b4192 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
@@ -56,7 +56,9 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
assertNotNull(srvc);
- assertEquals((Object) 2L, srvc.test());
+ long topVer = grid(0).cluster().topologyVersion();
+
+ assertEquals((Object)topVer, srvc.test());
Ignite srv = clientRouter(client);
@@ -70,7 +72,7 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
- assertEquals((Object) 4L, srvc.test());
+ assertEquals((Object)(topVer + 2), srvc.test());
}
/**
@@ -129,7 +131,7 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
BlockTpcCommunicationSpi commSpi = commSpi(srv);
- commSpi.blockMsg(GridNearTxPrepareResponse.class);
+ commSpi.blockMessage(GridNearTxPrepareResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -155,7 +157,7 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
assertNotDone(fut);
- commSpi.unblockMsg();
+ commSpi.unblockMessage();
reconnectClientNode(client, srv, null);
@@ -182,7 +184,7 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
BlockTpcCommunicationSpi commSpi = commSpi(srv);
- commSpi.blockMsg(GridJobExecuteResponse.class);
+ commSpi.blockMessage(GridJobExecuteResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -208,7 +210,7 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
assertNotDone(fut);
- commSpi.unblockMsg();
+ commSpi.unblockMessage();
reconnectClientNode(client, srv, null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
index 50feb86..a4cf77f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.datastreamer.*;
import org.apache.ignite.internal.util.lang.*;
@@ -27,6 +26,9 @@ import org.apache.ignite.testframework.*;
import javax.cache.*;
import java.util.concurrent.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
/**
*
*/
@@ -49,8 +51,8 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst
IgniteConfiguration cfg = super.getConfiguration(gridName);
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>(CACHE_NAME)
- .setAtomicityMode(CacheAtomicityMode.ATOMIC)
- .setCacheMode(CacheMode.PARTITIONED);
+ .setAtomicityMode(ATOMIC)
+ .setCacheMode(PARTITIONED);
cfg.setCacheConfiguration(ccfg);
@@ -61,7 +63,7 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst
* @throws Exception If failed.
*/
public void testStreamerReconnect() throws Exception {
- Ignite client = grid(serverCount());
+ final Ignite client = grid(serverCount());
assertTrue(client.cluster().localNode().isClient());
@@ -69,7 +71,7 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst
final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME);
- IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer("streamer");
+ IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME);
for (int i = 0; i < 50; i++)
streamer.addData(i, i);
@@ -82,9 +84,26 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst
}
}, 2000L);
- reconnectClientNode(client, srv, null);
+ assertEquals(50, srvCache.localSize());
- for (int i = 0; i < 50; i++)
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ try {
+ client.dataStreamer(CACHE_NAME);
+
+ fail();
+ }
+ catch (IgniteClientDisconnectedException e) {
+ assertNotNull(e.reconnectFuture());
+ }
+ }
+ });
+
+ checkStreamerClosed(streamer);
+
+ streamer = client.dataStreamer(CACHE_NAME);
+
+ for (int i = 50; i < 100; i++)
streamer.addData(i, i);
streamer.flush();
@@ -95,6 +114,8 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst
}
}, 2000L);
+ assertEquals(100, srvCache.localSize());
+
streamer.close();
streamer.future().get(2, TimeUnit.SECONDS);
@@ -114,11 +135,11 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst
final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME);
- final IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer("streamer");
+ final IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME);
BlockTpcCommunicationSpi commSpi = commSpi(srv);
- commSpi.blockMsg(DataStreamerResponse.class);
+ commSpi.blockMessage(DataStreamerResponse.class);
final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -133,6 +154,9 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst
return true;
}
+ finally {
+ streamer.close();
+ }
return false;
}
@@ -147,19 +171,63 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst
assertNotDone(fut);
- commSpi.unblockMsg();
+ commSpi.unblockMessage();
reconnectClientNode(client, srv, null);
assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+ checkStreamerClosed(streamer);
+
+ IgniteDataStreamer<Integer, Integer> streamer2 = client.dataStreamer(CACHE_NAME);
+
for (int i = 0; i < 50; i++)
- streamer.addData(i, i);
+ streamer2.addData(i, i);
- streamer.flush();
+ streamer2.close();
- assertTrue(srv.cache(CACHE_NAME).localSize() >= 0);
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return srvCache.localSize() == 50;
+ }
+ }, 2000L);
- srvCache.removeAll();
+ assertEquals(50, srvCache.localSize());
+ }
+
+ /**
+ * @param streamer Streamer.
+ */
+ private void checkStreamerClosed(IgniteDataStreamer<Integer, Integer> streamer) {
+ try {
+ streamer.addData(100, 100);
+
+ fail();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+ }
+
+ try {
+ streamer.flush();
+
+ fail();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+ }
+
+ try {
+ streamer.future().get();
+
+ fail();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+ }
+
+ streamer.tryFlush();
+
+ streamer.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index 93137bf..66c9835 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -32,7 +32,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Ignite Client Reconnect Test Suite");
suite.addTestSuite(IgniteClientReconnectStopTest.class);
- suite.addTestSuite(IgniteClientReconnectApiBlockTest.class);
+ suite.addTestSuite(IgniteClientReconnectApiExceptionTest.class);
suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class);
suite.addTestSuite(IgniteClientReconnectCacheTest.class);
suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 06c0961..c76dbe7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1439,6 +1439,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fut.get();
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ rdcQryExec.onDisconnected(reconnectFut);
+ }
+
/**
* Wrapper to store connection and flag is schema set or not.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index af29647..244ae46 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -40,7 +40,7 @@ public abstract class GridMergeIndex extends BaseIndex {
private static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000);
/** All rows number. */
- private final AtomicInteger expectedRowsCnt = new AtomicInteger(0);
+ private final AtomicInteger expRowsCnt = new AtomicInteger(0);
/** Remaining rows per source node ID. */
private final ConcurrentMap<UUID, Counter> remainingRows = new ConcurrentHashMap8<>();
@@ -75,8 +75,8 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public long getRowCount(Session session) {
- return expectedRowsCnt.get();
+ @Override public long getRowCount(Session ses) {
+ return expRowsCnt.get();
}
/** {@inheritDoc} */
@@ -95,12 +95,23 @@ public abstract class GridMergeIndex extends BaseIndex {
/**
* @param nodeId Node ID.
*/
- public void fail(UUID nodeId) {
- addPage0(new GridResultPage(null, nodeId, null) {
- @Override public boolean isFail() {
- return true;
+ public void fail(@Nullable UUID nodeId) {
+ if (nodeId != null) {
+ addPage0(new GridResultPage(null, nodeId, null) {
+ @Override public boolean isFail() {
+ return true;
+ }
+ });
+ }
+ else {
+ for (UUID nodeId0 : remainingRows.keySet()) {
+ addPage0(new GridResultPage(null, nodeId0, null) {
+ @Override public boolean isFail() {
+ return true;
+ }
+ });
}
- });
+ }
}
/**
@@ -120,7 +131,7 @@ public abstract class GridMergeIndex extends BaseIndex {
assert !cnt.initialized : "Counter is already initialized.";
cnt.addAndGet(allRows);
- expectedRowsCnt.addAndGet(allRows);
+ expRowsCnt.addAndGet(allRows);
// We need this separate flag to handle case when the first source contains only one page
// and it will signal that all remaining counters are zero and fetch is finished.
@@ -162,7 +173,7 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
+ @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
if (fetched == null)
throw new IgniteException("Fetched result set was too large.");
@@ -176,7 +187,7 @@ public abstract class GridMergeIndex extends BaseIndex {
* @return {@code true} If we have fetched all the remote rows.
*/
public boolean fetchedAll() {
- return fetchedCnt == expectedRowsCnt.get();
+ return fetchedCnt == expRowsCnt.get();
}
/**
@@ -200,32 +211,32 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public void close(Session session) {
+ @Override public void close(Session ses) {
// No-op.
}
/** {@inheritDoc} */
- @Override public void add(Session session, Row row) {
+ @Override public void add(Session ses, Row row) {
throw DbException.getUnsupportedException("add");
}
/** {@inheritDoc} */
- @Override public void remove(Session session, Row row) {
+ @Override public void remove(Session ses, Row row) {
throw DbException.getUnsupportedException("remove row");
}
/** {@inheritDoc} */
- @Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
+ @Override public double getCost(Session ses, int[] masks, TableFilter filter, SortOrder sortOrder) {
return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
}
/** {@inheritDoc} */
- @Override public void remove(Session session) {
+ @Override public void remove(Session ses) {
throw DbException.getUnsupportedException("remove index");
}
/** {@inheritDoc} */
- @Override public void truncate(Session session) {
+ @Override public void truncate(Session ses) {
throw DbException.getUnsupportedException("truncate");
}
@@ -235,7 +246,7 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public Cursor findFirstOrLast(Session session, boolean first) {
+ @Override public Cursor findFirstOrLast(Session ses, boolean first) {
throw DbException.getUnsupportedException("findFirstOrLast");
}
@@ -299,6 +310,7 @@ public abstract class GridMergeIndex extends BaseIndex {
private Iterator<Row> stream;
/**
+ * @param stream Iterator.
*/
public FetchingCursor(Iterator<Row> stream) {
super(new FetchedIterator());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 32d1c95..64e16bf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.h2.command.*;
@@ -47,6 +48,7 @@ import org.h2.table.*;
import org.h2.tools.*;
import org.h2.util.*;
import org.h2.value.*;
+import org.jetbrains.annotations.*;
import org.jsr166.*;
import javax.cache.*;
@@ -234,10 +236,15 @@ public class GridReduceQueryExecutor {
Object errState = r.state.get();
if (errState != null) {
+ CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null;
+
+ if (err0 != null && err0.getCause() instanceof IgniteClientDisconnectedException)
+ throw err0;
+
CacheException e = new CacheException("Failed to fetch data from node: " + node.id());
- if (errState instanceof CacheException)
- e.addSuppressed((Throwable)errState);
+ if (err0 != null)
+ e.addSuppressed(err0);
throw e;
}
@@ -301,6 +308,7 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param cctx Cache context.
* @return {@code true} If cache context
*/
private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
@@ -481,6 +489,12 @@ public class GridReduceQueryExecutor {
runs.put(qryReqId, r);
try {
+ if (ctx.clientDisconnected()) {
+ throw new CacheException("Query was cancelled, client node disconnected.",
+ new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(),
+ "Client node disconnected.", null));
+ }
+
Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
if (qry.explain()) {
@@ -506,8 +520,14 @@ public class GridReduceQueryExecutor {
Object state = r.state.get();
if (state != null) {
- if (state instanceof CacheException)
- throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+ if (state instanceof CacheException) {
+ CacheException err = (CacheException)state;
+
+ if (err.getCause() instanceof IgniteClientDisconnectedException)
+ throw err;
+
+ throw new CacheException("Failed to run map query remotely.", err);
+ }
if (state instanceof AffinityTopologyVersion) {
retry = true;
@@ -550,6 +570,9 @@ public class GridReduceQueryExecutor {
catch (IgniteCheckedException | RuntimeException e) {
U.closeQuiet(r.conn);
+ if (e instanceof CacheException)
+ throw (CacheException)e;
+
throw new CacheException("Failed to run reduce query locally.", e);
}
finally {
@@ -1082,6 +1105,17 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ CacheException err = new CacheException("Query was cancelled, client node disconnected.",
+ new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null));
+
+ for (Map.Entry<Long, QueryRun> e : runs.entrySet())
+ e.getValue().state(err, null);
+ }
+
+ /**
*
*/
private static class QueryRun {
@@ -1104,7 +1138,7 @@ public class GridReduceQueryExecutor {
* @param o Fail state object.
* @param nodeId Node ID.
*/
- void state(Object o, UUID nodeId) {
+ void state(Object o, @Nullable UUID nodeId) {
assert o != null;
assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();