You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/08/18 09:08:18 UTC
[3/3] ignite git commit: ignite-3547
ignite-3547
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d200004
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d200004
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d200004
Branch: refs/heads/ignite-3547-1
Commit: 9d200004c28e83a0b7024611d3e1b485fe3f317d
Parents: 476081b
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 18 12:03:32 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 18 12:03:32 2016 +0300
----------------------------------------------------------------------
.../util/nio/GridTcpNioCommunicationClient.java | 5 +-
.../communication/tcp/TcpCommunicationSpi.java | 49 +++--
.../IgniteCacheConnectionRecoveryTest.java | 202 +++++++++++++++++++
.../IgniteCacheMessageRecoveryAbstractTest.java | 14 +-
...gniteCacheMessageRecoveryIdleConnection.java | 154 --------------
...eCacheMessageRecoveryIdleConnectionTest.java | 157 ++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 6 +-
7 files changed, 399 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 4022bc5..5fe521d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -125,8 +125,11 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
if (log.isDebugEnabled())
log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
- if (e.getCause() instanceof IOException)
+ if (e.getCause() instanceof IOException) {
+ ses.close();
+
return true;
+ }
else
throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2c03b2d..d81b9f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -353,6 +353,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
UUID id = ses.meta(NODE_ID_META);
if (id != null) {
+ GridCommunicationClient client = clients.get(id);
+
+ if (client instanceof GridTcpNioCommunicationClient &&
+ ((GridTcpNioCommunicationClient) client).session() == ses) {
+ client.close();
+
+ clients.remove(id, client);
+ }
+
if (!stopping) {
boolean reconnect = false;
@@ -372,9 +381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
recoveryData.onNodeLeft();
}
- DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id,
- ses,
- recoveryData,
+ DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData,
reconnect);
commWorker.addProcessDisconnectRequest(disconnectData);
@@ -1400,6 +1407,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.append(']').append(U.nl());
}
+ sb.append("Communication SPI clients: ").append(U.nl());
+
+ for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
+ sb.append(" [node=").append(entry.getKey())
+ .append(", client=").append(entry.getValue())
+ .append(']').append(U.nl());
+ }
+
U.warn(log, sb.toString());
}
@@ -1978,17 +1993,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
client.release();
- client = null;
-
if (!retry)
sentMsgsCnt.increment();
else {
+ clients.remove(node.id(), client);
+
ClusterNode node0 = getSpiContext().node(node.id());
if (node0 == null)
throw new IgniteCheckedException("Failed to send message to remote node " +
"(node has left the grid): " + node.id());
}
+
+ client = null;
}
while (retry);
}
@@ -3187,12 +3204,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param sesInfo Disconnected session information.
*/
private void processDisconnect(DisconnectedSessionInfo sesInfo) {
- GridCommunicationClient client = clients.get(sesInfo.nodeId);
-
- if (client instanceof GridTcpNioCommunicationClient &&
- ((GridTcpNioCommunicationClient) client).session() == sesInfo.ses)
- clients.remove(sesInfo.nodeId, client);
-
if (sesInfo.reconnect) {
GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
@@ -3205,7 +3216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
- client = reserveClient(node);
+ GridCommunicationClient client = reserveClient(node);
client.release();
}
@@ -3756,29 +3767,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
private static class DisconnectedSessionInfo {
/** */
- private final UUID nodeId;
-
- /** */
- private final GridNioSession ses;
-
- /** */
private final GridNioRecoveryDescriptor recoveryDesc;
/** */
private final boolean reconnect;
/**
- * @param nodeId Node ID.
- * @param ses Session.
* @param recoveryDesc Recovery descriptor.
* @param reconnect Reconnect flag.
*/
- public DisconnectedSessionInfo(UUID nodeId,
- GridNioSession ses,
- @Nullable GridNioRecoveryDescriptor recoveryDesc,
+ DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc,
boolean reconnect) {
- this.nodeId = nodeId;
- this.ses = ses;
this.recoveryDesc = recoveryDesc;
this.reconnect = reconnect;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
new file mode 100644
index 0000000..4a07674
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheConnectionRecoveryTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private static final int SRVS = 5;
+
+ /** */
+ private static final int CLIENTS = 5;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ cfg.setCacheConfiguration(
+ cacheConfiguration("cache1", TRANSACTIONAL),
+ cacheConfiguration("cache2", ATOMIC));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(SRVS);
+
+ client = true;
+
+ startGridsMultiThreaded(SRVS, CLIENTS);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionRecovery() throws Exception {
+ final Map<Integer, Integer> data = new TreeMap<>();
+
+ for (int i = 0; i < 500; i++)
+ data.put(i, i);
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ final long stopTime = U.currentTimeMillis() + 30_000;
+
+ final AtomicReference<CyclicBarrier> barrierRef = new AtomicReference<>();
+
+ final int TEST_THREADS = (CLIENTS + SRVS) * 2;
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int idx0 = idx.getAndIncrement();
+ Ignite node = ignite(idx0 % (SRVS + CLIENTS));
+
+ Thread.currentThread().setName("test-thread-" + idx0 + "-" + node.name());
+
+ IgniteCache cache1 = node.cache("cache1").withAsync();
+ IgniteCache cache2 = node.cache("cache2").withAsync();
+
+ int iter = 0;
+
+ while (U.currentTimeMillis() < stopTime) {
+ try {
+ cache1.putAll(data);
+ cache1.future().get(15, SECONDS);
+
+ cache2.putAll(data);
+ cache2.future().get(15, SECONDS);
+
+ CyclicBarrier b = barrierRef.get();
+
+ if (b != null)
+ b.await(15, SECONDS);
+ }
+ catch (Exception e) {
+ synchronized (IgniteCacheConnectionRecoveryTest.class) {
+ log.error("Failed to execute update, will dump debug information" +
+ " [err=" + e+ ", iter=" + iter + ']', e);
+
+ List<Ignite> nodes = IgnitionEx.allGridsx();
+
+ for (Ignite node0 : nodes)
+ ((IgniteKernal)node0).dumpDebugInfo();
+
+ U.dumpThreads(log);
+ }
+
+ throw e;
+ }
+ }
+
+ return null;
+ }
+ }, TEST_THREADS, "test-thread");
+
+ while (System.currentTimeMillis() < stopTime) {
+ boolean closed = false;
+
+ for (Ignite node : G.allGrids()) {
+ if (IgniteCacheMessageRecoveryAbstractTest.closeSessions(node))
+ closed = true;
+ }
+
+ if (closed) {
+ CyclicBarrier b = new CyclicBarrier(TEST_THREADS + 1, new Runnable() {
+ @Override public void run() {
+ barrierRef.set(null);
+ }
+ });
+
+ barrierRef.set(b);
+
+ b.await();
+ }
+
+ U.sleep(50);
+ }
+
+ fut.get();
+ }
+
+ /**
+ * @param name Cache name.
+ * @param atomicityMode Cache atomicity mode.
+ * @return Configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(name);
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(REPLICATED);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 16d7e5d..0460a8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -150,7 +150,11 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
for (int i = 0; i < 30; i++) {
Thread.sleep(1000);
- closed |= closeSessions();
+ Ignite node0 = ignite(ThreadLocalRandom.current().nextInt(0, GRID_CNT));
+
+ log.info("Close sessions for: " + ignite.name());
+
+ closed |= closeSessions(node0);
}
assertTrue(closed);
@@ -163,13 +167,11 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
}
/**
+ * @param ignite Node.
* @throws Exception If failed.
+ * @return {@code True} if closed at least one session.
*/
- private boolean closeSessions() throws Exception {
- Ignite ignite = ignite(ThreadLocalRandom.current().nextInt(0, GRID_CNT));
-
- log.info("Close sessions for: " + ignite.name());
-
+ static boolean closeSessions(Ignite ignite) throws Exception {
TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi();
Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients");
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
deleted file mode 100644
index 618fe2a..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.IgnitionEx;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- *
- */
-public class IgniteCacheMessageRecoveryIdleConnection extends GridCommonAbstractTest {
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private static final int NODES = 3;
-
- /** */
- private static final long IDLE_TIMEOUT = 50;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
-
- TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
-
- commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT);
- commSpi.setSharedMemoryPort(-1);
-
- cfg.setCommunicationSpi(commSpi);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 2 * 60_000;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- startGridsMultiThreaded(NODES);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCacheOperationsIdleConnectionCloseTx() throws Exception {
- cacheOperationsIdleConnectionClose(TRANSACTIONAL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception {
- cacheOperationsIdleConnectionClose(ATOMIC);
- }
-
- /**
- * @param atomicityMode Cache atomicity mode.
- * @throws Exception If failed.
- */
- private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception {
- CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
- ccfg.setAtomicityMode(atomicityMode);
- ccfg.setCacheMode(REPLICATED);
- ccfg.setWriteSynchronizationMode(FULL_SYNC);
-
- IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync();
-
- try {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- int iter = 0;
-
- long stopTime = System.currentTimeMillis() + 90_000;
-
- while (System.currentTimeMillis() < stopTime) {
- if (iter++ % 10 == 0)
- log.info("Iteration: " + iter);
-
- cache.put(iter, 1);
-
- IgniteFuture<?> fut = cache.future();
-
- try {
- fut.get(10_000);
- }
- catch (IgniteException e) {
- List<Ignite> nodes = IgnitionEx.allGridsx();
-
- for (Ignite node : nodes)
- ((IgniteKernal)node).dumpDebugInfo();
-
- U.dumpThreads(log);
-
- throw e;
- }
-
- U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10));
- }
- }
- finally {
- ignite(0).destroyCache(ccfg.getName());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
new file mode 100644
index 0000000..b9003cd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheMessageRecoveryIdleConnectionTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 3;
+
+ /** */
+ private static final long IDLE_TIMEOUT = 50;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT);
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 2 * 60_000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperationsIdleConnectionCloseTx() throws Exception {
+ cacheOperationsIdleConnectionClose(TRANSACTIONAL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception {
+ cacheOperationsIdleConnectionClose(ATOMIC);
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @throws Exception If failed.
+ */
+ private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(REPLICATED);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync();
+
+ try {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int iter = 0;
+
+ long stopTime = System.currentTimeMillis() + 90_000;
+
+ while (System.currentTimeMillis() < stopTime) {
+ if (iter++ % 50 == 0)
+ log.info("Iteration: " + iter);
+
+ cache.put(iter, 1);
+
+ IgniteFuture<?> fut = cache.future();
+
+ try {
+ fut.get(10_000);
+ }
+ catch (IgniteException e) {
+ log.error("Failed to execute update, will dump debug information" +
+ " [err=" + e+ ", iter=" + iter + ']', e);
+
+ List<Ignite> nodes = IgnitionEx.allGridsx();
+
+ for (Ignite node : nodes)
+ ((IgniteKernal)node).dumpDebugInfo();
+
+ U.dumpThreads(log);
+
+ throw e;
+ }
+
+ U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10));
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 8c3f4de..2c294ba 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -125,7 +125,8 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp
import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnection;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnectionTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageWriteTimeoutTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSystemTransactionsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxMessageRecoveryTest;
@@ -281,7 +282,8 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);
- suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnection.class);
+ suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnectionTest.class);
+ suite.addTestSuite(IgniteCacheConnectionRecoveryTest.class);
GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionAtomicSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredAtomicSelfTest.class, ignoredTests);