You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/08/22 07:37:19 UTC

[23/32] ignite git commit: ignite-3547 Do not try to re-send message using the same client. Remove disconnected client from 'onDisconnected' callback.

ignite-3547 Do not try to re-send message using the same client. Remove disconnected client from 'onDisconnected' callback.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a20ca351
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a20ca351
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a20ca351

Branch: refs/heads/ignite-3220-1
Commit: a20ca351b33efb07b83c6f5967fa7a3cef154c83
Parents: 8aa534a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 19 10:37:59 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 19 10:37:59 2016 +0300

----------------------------------------------------------------------
 .../util/nio/GridTcpNioCommunicationClient.java |   5 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  49 +++--
 .../CacheSerializableTransactionsTest.java      |   5 +
 .../IgniteCacheConnectionRecoveryTest.java      | 205 +++++++++++++++++++
 .../IgniteCacheMessageRecoveryAbstractTest.java |  14 +-
 ...gniteCacheMessageRecoveryIdleConnection.java | 154 --------------
 ...eCacheMessageRecoveryIdleConnectionTest.java | 157 ++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   6 +-
 8 files changed, 407 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 4022bc5..5fe521d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -125,8 +125,11 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
                 if (log.isDebugEnabled())
                     log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
 
-                if (e.getCause() instanceof IOException)
+                if (e.getCause() instanceof IOException) {
+                    ses.close();
+
                     return true;
+                }
                 else
                     throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2c03b2d..d81b9f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -353,6 +353,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 UUID id = ses.meta(NODE_ID_META);
 
                 if (id != null) {
+                    GridCommunicationClient client = clients.get(id);
+
+                    if (client instanceof GridTcpNioCommunicationClient &&
+                        ((GridTcpNioCommunicationClient) client).session() == ses) {
+                        client.close();
+
+                        clients.remove(id, client);
+                    }
+
                     if (!stopping) {
                         boolean reconnect = false;
 
@@ -372,9 +381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 recoveryData.onNodeLeft();
                         }
 
-                        DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id,
-                            ses,
-                            recoveryData,
+                        DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData,
                             reconnect);
 
                         commWorker.addProcessDisconnectRequest(disconnectData);
@@ -1400,6 +1407,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(']').append(U.nl());
             }
 
+            sb.append("Communication SPI clients: ").append(U.nl());
+
+            for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
+                sb.append("    [node=").append(entry.getKey())
+                    .append(", client=").append(entry.getValue())
+                    .append(']').append(U.nl());
+            }
+
             U.warn(log, sb.toString());
         }
 
@@ -1978,17 +1993,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     client.release();
 
-                    client = null;
-
                     if (!retry)
                         sentMsgsCnt.increment();
                     else {
+                        clients.remove(node.id(), client);
+
                         ClusterNode node0 = getSpiContext().node(node.id());
 
                         if (node0 == null)
                             throw new IgniteCheckedException("Failed to send message to remote node " +
                                 "(node has left the grid): " + node.id());
                     }
+
+                    client = null;
                 }
                 while (retry);
             }
@@ -3187,12 +3204,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
          * @param sesInfo Disconnected session information.
          */
         private void processDisconnect(DisconnectedSessionInfo sesInfo) {
-            GridCommunicationClient client = clients.get(sesInfo.nodeId);
-
-            if (client instanceof GridTcpNioCommunicationClient &&
-                ((GridTcpNioCommunicationClient) client).session() == sesInfo.ses)
-                    clients.remove(sesInfo.nodeId, client);
-
             if (sesInfo.reconnect) {
                 GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
 
@@ -3205,7 +3216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
 
-                    client = reserveClient(node);
+                    GridCommunicationClient client = reserveClient(node);
 
                     client.release();
                 }
@@ -3756,29 +3767,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private static class DisconnectedSessionInfo {
         /** */
-        private final UUID nodeId;
-
-        /** */
-        private final GridNioSession ses;
-
-        /** */
         private final GridNioRecoveryDescriptor recoveryDesc;
 
         /** */
         private final boolean reconnect;
 
         /**
-         * @param nodeId Node ID.
-         * @param ses Session.
          * @param recoveryDesc Recovery descriptor.
          * @param reconnect Reconnect flag.
          */
-        public DisconnectedSessionInfo(UUID nodeId,
-            GridNioSession ses,
-            @Nullable GridNioRecoveryDescriptor recoveryDesc,
+        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc,
             boolean reconnect) {
-            this.nodeId = nodeId;
-            this.ses = ses;
             this.recoveryDesc = recoveryDesc;
             this.reconnect = reconnect;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 4baef66..3d4f850 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -3311,6 +3311,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void getRemoveTx(boolean nearCache, boolean store) throws Exception {
+        long stopTime = U.currentTimeMillis() + getTestTimeout() - 30_000;
+
         final Ignite ignite0 = ignite(0);
 
         CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, store, false);
@@ -3330,6 +3332,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
             }
 
             for (int i = 0; i < 100; i++) {
+                if (U.currentTimeMillis() > stopTime)
+                    break;
+
                 final AtomicInteger cntr = new AtomicInteger();
 
                 final Integer key = i;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
new file mode 100644
index 0000000..7195c37
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheConnectionRecoveryTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static final int SRVS = 5;
+
+    /** */
+    private static final int CLIENTS = 5;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setClientMode(client);
+
+        cfg.setCacheConfiguration(
+            cacheConfiguration("cache1", TRANSACTIONAL),
+            cacheConfiguration("cache2", ATOMIC));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(SRVS);
+
+        client = true;
+
+        startGridsMultiThreaded(SRVS, CLIENTS);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRecovery() throws Exception {
+        final Map<Integer, Integer> data = new TreeMap<>();
+
+        for (int i = 0; i < 500; i++)
+            data.put(i, i);
+
+        final AtomicInteger idx = new AtomicInteger();
+
+        final long stopTime = U.currentTimeMillis() + 30_000;
+
+        final AtomicReference<CyclicBarrier> barrierRef = new AtomicReference<>();
+
+        final int TEST_THREADS = (CLIENTS + SRVS) * 2;
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int idx0 = idx.getAndIncrement();
+                Ignite node = ignite(idx0 % (SRVS + CLIENTS));
+
+                Thread.currentThread().setName("test-thread-" + idx0 + "-" + node.name());
+
+                IgniteCache cache1 = node.cache("cache1").withAsync();
+                IgniteCache cache2 = node.cache("cache2").withAsync();
+
+                int iter = 0;
+
+                while (U.currentTimeMillis() < stopTime) {
+                    try {
+                        cache1.putAll(data);
+                        cache1.future().get(15, SECONDS);
+
+                        cache2.putAll(data);
+                        cache2.future().get(15, SECONDS);
+
+                        CyclicBarrier b = barrierRef.get();
+
+                        if (b != null)
+                            b.await(15, SECONDS);
+                    }
+                    catch (Exception e) {
+                        synchronized (IgniteCacheConnectionRecoveryTest.class) {
+                            log.error("Failed to execute update, will dump debug information" +
+                                " [err=" + e+ ", iter=" + iter + ']', e);
+
+                            List<Ignite> nodes = IgnitionEx.allGridsx();
+
+                            for (Ignite node0 : nodes)
+                                ((IgniteKernal)node0).dumpDebugInfo();
+
+                            U.dumpThreads(log);
+                        }
+
+                        throw e;
+                    }
+                }
+
+                return null;
+            }
+        }, TEST_THREADS, "test-thread");
+
+        while  (System.currentTimeMillis() < stopTime) {
+            boolean closed = false;
+
+            for (Ignite node : G.allGrids()) {
+                if (IgniteCacheMessageRecoveryAbstractTest.closeSessions(node))
+                    closed = true;
+            }
+
+            if (closed) {
+                CyclicBarrier b = new CyclicBarrier(TEST_THREADS + 1, new Runnable() {
+                    @Override public void run() {
+                        barrierRef.set(null);
+                    }
+                });
+
+                barrierRef.set(b);
+
+                b.await();
+            }
+
+            U.sleep(50);
+        }
+
+        fut.get();
+    }
+
+    /**
+     * @param name Cache name.
+     * @param atomicityMode Cache atomicity mode.
+     * @return Configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 16d7e5d..0460a8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -150,7 +150,11 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
             for (int i = 0; i < 30; i++) {
                 Thread.sleep(1000);
 
-                closed |= closeSessions();
+                Ignite node0 = ignite(ThreadLocalRandom.current().nextInt(0, GRID_CNT));
+
+                log.info("Close sessions for: " + ignite.name());
+
+                closed |= closeSessions(node0);
             }
 
             assertTrue(closed);
@@ -163,13 +167,11 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
     }
 
     /**
+     * @param ignite Node.
      * @throws Exception If failed.
+     * @return {@code True} if closed at least one session.
      */
-    private boolean closeSessions() throws Exception {
-        Ignite ignite = ignite(ThreadLocalRandom.current().nextInt(0, GRID_CNT));
-
-        log.info("Close sessions for: " + ignite.name());
-
+    static boolean closeSessions(Ignite ignite) throws Exception {
         TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi();
 
         Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients");

http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
deleted file mode 100644
index 618fe2a..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.IgnitionEx;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- *
- */
-public class IgniteCacheMessageRecoveryIdleConnection extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final int NODES = 3;
-
-    /** */
-    private static final long IDLE_TIMEOUT = 50;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
-
-        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
-
-        commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT);
-        commSpi.setSharedMemoryPort(-1);
-
-        cfg.setCommunicationSpi(commSpi);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 2 * 60_000;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGridsMultiThreaded(NODES);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        super.afterTestsStopped();
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCacheOperationsIdleConnectionCloseTx() throws Exception {
-        cacheOperationsIdleConnectionClose(TRANSACTIONAL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception {
-        cacheOperationsIdleConnectionClose(ATOMIC);
-    }
-
-    /**
-     * @param atomicityMode Cache atomicity mode.
-     * @throws Exception If failed.
-     */
-    private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception {
-        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
-        ccfg.setAtomicityMode(atomicityMode);
-        ccfg.setCacheMode(REPLICATED);
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
-
-        IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync();
-
-        try {
-            ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
-            int iter = 0;
-
-            long stopTime = System.currentTimeMillis() + 90_000;
-
-            while (System.currentTimeMillis() < stopTime) {
-                if (iter++ % 10 == 0)
-                    log.info("Iteration: " + iter);
-
-                cache.put(iter, 1);
-
-                IgniteFuture<?> fut = cache.future();
-
-                try {
-                    fut.get(10_000);
-                }
-                catch (IgniteException e) {
-                    List<Ignite> nodes = IgnitionEx.allGridsx();
-
-                    for (Ignite node : nodes)
-                        ((IgniteKernal)node).dumpDebugInfo();
-
-                    U.dumpThreads(log);
-
-                    throw e;
-                }
-
-                U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10));
-            }
-        }
-        finally {
-            ignite(0).destroyCache(ccfg.getName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
new file mode 100644
index 0000000..b9003cd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheMessageRecoveryIdleConnectionTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 3;
+
+    /** */
+    private static final long IDLE_TIMEOUT = 50;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT);
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 2 * 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheOperationsIdleConnectionCloseTx() throws Exception {
+        cacheOperationsIdleConnectionClose(TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception {
+        cacheOperationsIdleConnectionClose(ATOMIC);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync();
+
+        try {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            int iter = 0;
+
+            long stopTime = System.currentTimeMillis() + 90_000;
+
+            while (System.currentTimeMillis() < stopTime) {
+                if (iter++ % 50 == 0)
+                    log.info("Iteration: " + iter);
+
+                cache.put(iter, 1);
+
+                IgniteFuture<?> fut = cache.future();
+
+                try {
+                    fut.get(10_000);
+                }
+                catch (IgniteException e) {
+                    log.error("Failed to execute update, will dump debug information" +
+                        " [err=" + e+ ", iter=" + iter + ']', e);
+
+                    List<Ignite> nodes = IgnitionEx.allGridsx();
+
+                    for (Ignite node : nodes)
+                        ((IgniteKernal)node).dumpDebugInfo();
+
+                    U.dumpThreads(log);
+
+                    throw e;
+                }
+
+                U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10));
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 84e1502..9240ef5 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -125,7 +125,8 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp
 import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnection;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnectionTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageWriteTimeoutTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSystemTransactionsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxMessageRecoveryTest;
@@ -283,7 +284,8 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
         suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
         suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);
-        suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnection.class);
+        suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnectionTest.class);
+        suite.addTestSuite(IgniteCacheConnectionRecoveryTest.class);
         GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionAtomicSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredAtomicSelfTest.class, ignoredTests);