You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/06 14:39:24 UTC

[02/50] [abbrv] ignite git commit: ignite-3428 Fixed message recovery handling on reconnect

ignite-3428 Fixed message recovery handling on reconnect


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89d722c2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89d722c2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89d722c2

Branch: refs/heads/ignite-1.5.31-1
Commit: 89d722c25082fac7e2c7ba4af6938dc742eab99c
Parents: bebf3f0
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 6 09:31:07 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 6 09:31:07 2016 +0300

----------------------------------------------------------------------
 .../util/nio/GridNioRecoveryDescriptor.java     |  19 ++-
 .../ignite/internal/util/nio/GridNioServer.java |  50 ++++--
 .../util/nio/GridSelectorNioSessionImpl.java    |   7 +
 .../communication/tcp/TcpCommunicationSpi.java  | 167 ++++++++++++-------
 ...gniteCacheMessageRecoveryIdleConnection.java | 154 +++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 6 files changed, 324 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/89d722c2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 409bded..35480ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -74,6 +74,9 @@ public class GridNioRecoveryDescriptor {
     /** Maximum size of unacknowledged messages queue. */
     private final int queueLimit;
 
+    /** Number of descriptor reservations (for info purposes). */
+    private int reserveCnt;
+
     /**
      * @param queueLimit Maximum size of unacknowledged messages queue.
      * @param node Node.
@@ -256,9 +259,12 @@ public class GridNioRecoveryDescriptor {
             while (!connected && reserved)
                 wait();
 
-            if (!connected)
+            if (!connected) {
                 reserved = true;
 
+                reserveCnt++;
+            }
+
             return !connected;
         }
     }
@@ -375,12 +381,23 @@ public class GridNioRecoveryDescriptor {
             else {
                 reserved = true;
 
+                reserveCnt++;
+
                 return true;
             }
         }
     }
 
     /**
+     * @return Number of descriptor reservations.
+     */
+    public int reserveCount() {
+        synchronized (this) {
+            return reserveCnt;
+        }
+    }
+
+    /**
      * @param futs Futures to complete.
      */
     private void completeOnNodeLeft(GridNioFuture<?>[] futs) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/89d722c2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 75cf776..ac55a14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1429,11 +1429,7 @@ public class GridNioServer<T> {
 
                                     sb.append("    Connection info [")
                                         .append("rmtAddr=").append(ses.remoteAddress())
-                                        .append(", locAddr=").append(ses.localAddress())
-                                        .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
-                                        .append(", msgReader=").append(reader != null ? reader.toString() : "null")
-                                        .append(", bytesRcvd=").append(ses.bytesReceived())
-                                        .append(", bytesSent=").append(ses.bytesSent());
+                                        .append(", locAddr=").append(ses.localAddress());
 
                                     GridNioRecoveryDescriptor desc = ses.recoveryDescriptor();
 
@@ -1446,11 +1442,32 @@ public class GridNioServer<T> {
                                     else
                                         sb.append(", recoveryDesc=null");
 
+                                    sb.append(", bytesRcvd=").append(ses.bytesReceived())
+                                        .append(", bytesSent=").append(ses.bytesSent())
+                                        .append(", opQueueSize=").append(ses.writeQueueSize())
+                                        .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
+                                        .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+
+                                    int cnt = 0;
+
+                                    for (GridNioFuture<?> fut : ses.writeQueue()) {
+                                        if (cnt == 0)
+                                            sb.append(",\n opQueue=[").append(fut);
+                                        else
+                                            sb.append(',').append(fut);
+
+                                        if (++cnt == 5) {
+                                            sb.append(']');
+
+                                            break;
+                                        }
+                                    }
+
+
                                     sb.append("]").append(U.nl());
                                 }
 
-                                if (log.isInfoEnabled())
-                                    log.info(sb.toString());
+                                U.warn(log, sb.toString());
 
                                 // Complete the request just in case (none should wait on this future).
                                 req.onDone(true);
@@ -1721,13 +1738,6 @@ public class GridNioServer<T> {
                 if (e != null)
                     filterChain.onExceptionCaught(ses, e);
 
-                try {
-                    filterChain.onSessionClosed(ses);
-                }
-                catch (IgniteCheckedException e1) {
-                    filterChain.onExceptionCaught(ses, e1);
-                }
-
                 ses.removeMeta(BUF_META_KEY);
 
                 // Since ses is in closed state, no write requests will be added.
@@ -1755,6 +1765,13 @@ public class GridNioServer<T> {
                         fut.connectionClosed();
                 }
 
+                try {
+                    filterChain.onSessionClosed(ses);
+                }
+                catch (IgniteCheckedException e1) {
+                    filterChain.onExceptionCaught(ses, e1);
+                }
+
                 return true;
             }
 
@@ -1980,24 +1997,29 @@ public class GridNioServer<T> {
         private SocketChannel sockCh;
 
         /** Session to perform operation on. */
+        @GridToStringExclude
         private GridSelectorNioSessionImpl ses;
 
         /** Is it a close request or a write request. */
         private NioOperation op;
 
         /** Message. */
+        @GridToStringExclude
         private ByteBuffer msg;
 
         /** Direct message. */
         private Message commMsg;
 
         /** */
+        @GridToStringExclude
         private boolean accepted;
 
         /** */
+        @GridToStringExclude
         private Map<Integer, ?> meta;
 
         /** */
+        @GridToStringExclude
         private boolean skipRecovery;
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/89d722c2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index deb7d2b..360b3d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -264,6 +264,13 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         return queueSize.get();
     }
 
+    /**
+     * @return Write requests.
+     */
+    Collection<GridNioFuture<?>> writeQueue() {
+        return queue;
+    }
+
     /** {@inheritDoc} */
     @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
         assert recoveryDesc != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/89d722c2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 0a18003..ed29b59 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -353,30 +353,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 UUID id = ses.meta(NODE_ID_META);
 
                 if (id != null) {
-                    GridCommunicationClient rmv = clients.get(id);
+                    if (!stopping) {
+                        boolean reconnect = false;
 
-                    if (rmv instanceof GridTcpNioCommunicationClient &&
-                        ((GridTcpNioCommunicationClient)rmv).session() == ses &&
-                        clients.remove(id, rmv)) {
-                        rmv.forceClose();
+                        GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
 
-                        if (!stopping) {
-                            GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
+                        if (recoveryData != null) {
+                            if (recoveryData.nodeAlive(getSpiContext().node(id))) {
+                                if (!recoveryData.messagesFutures().isEmpty()) {
+                                    reconnect = true;
 
-                            if (recoveryData != null) {
-                                if (recoveryData.nodeAlive(getSpiContext().node(id))) {
-                                    if (!recoveryData.messagesFutures().isEmpty()) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Session was closed but there are unacknowledged messages, " +
-                                                "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
-
-                                        commWorker.addReconnectRequest(recoveryData);
-                                    }
+                                    if (log.isDebugEnabled())
+                                        log.debug("Session was closed but there are unacknowledged messages, " +
+                                            "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
                                 }
-                                else
-                                    recoveryData.onNodeLeft();
                             }
+                            else
+                                recoveryData.onNodeLeft();
                         }
+
+                        DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id,
+                            ses,
+                            recoveryData,
+                            reconnect);
+
+                        commWorker.addProcessDisconnectRequest(disconnectData);
                     }
 
                     CommunicationListener<Message> lsnr0 = lsnr;
@@ -1383,7 +1384,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @Override public void dumpStats() {
         IgniteLogger log = this.log;
 
-        if (log != null && log.isInfoEnabled()) {
+        if (log != null) {
             StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
 
             for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
@@ -1393,11 +1394,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(", msgsSent=").append(desc.sent())
                     .append(", msgsAckedByRmt=").append(desc.acked())
                     .append(", msgsRcvd=").append(desc.received())
+                    .append(", lastAcked=").append(desc.lastAcknowledged())
+                    .append(", reserveCnt=").append(desc.reserveCount())
                     .append(", descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
             }
 
-            log.info(sb.toString());
+            U.warn(log, sb.toString());
         }
 
         GridNioServer<Message> nioSrvr = this.nioSrvr;
@@ -3030,14 +3033,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             endpoint.close();
         }
 
-        /** @{@inheritDoc} */
+        /** {@inheritDoc} */
         @Override protected void cleanup() {
             super.cleanup();
 
             endpoint.close();
         }
 
-        /** @{@inheritDoc} */
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(ShmemWorker.class, this);
         }
@@ -3048,7 +3051,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private class CommunicationWorker extends IgniteSpiThread {
         /** */
-        private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
+        private final BlockingQueue<DisconnectedSessionInfo> q = new LinkedBlockingQueue<>();
 
         /**
          * @param gridName Grid name.
@@ -3063,10 +3066,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 log.debug("Tcp communication worker has been started.");
 
             while (!isInterrupted()) {
-                GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+                DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
 
-                if (recoveryDesc != null)
-                    processRecovery(recoveryDesc);
+                if (disconnectData != null)
+                    processDisconnect(disconnectData);
                 else
                     processIdle();
             }
@@ -3171,56 +3174,62 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         /**
-         * @param recoveryDesc Recovery descriptor.
+         * @param sesInfo Disconnected session information.
          */
-        private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
-            ClusterNode node = recoveryDesc.node();
+        private void processDisconnect(DisconnectedSessionInfo sesInfo) {
+            GridCommunicationClient client = clients.get(sesInfo.nodeId);
 
-            try {
-                if (clients.containsKey(node.id()) ||
-                    !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
-                    !getSpiContext().pingNode(node.id()))
-                    return;
-            }
-            catch (IgniteClientDisconnectedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to ping node, client disconnected.");
+            if (client instanceof GridTcpNioCommunicationClient &&
+                ((GridTcpNioCommunicationClient) client).session() == sesInfo.ses)
+                    clients.remove(sesInfo.nodeId, client);
 
-                return;
-            }
+            if (sesInfo.reconnect) {
+                GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
 
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+                ClusterNode node = recoveryDesc.node();
 
-                GridCommunicationClient client = reserveClient(node);
+                if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+                    return;
 
-                client.release();
-            }
-            catch (IgniteCheckedException | IgniteException e) {
-                if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
+                try {
                     if (log.isDebugEnabled())
-                        log.debug("Recovery reconnect failed, will retry " +
-                            "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+                        log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+
+                    client = reserveClient(node);
 
-                    addReconnectRequest(recoveryDesc);
+                    client.release();
                 }
-                else {
-                    if (log.isDebugEnabled())
-                        log.debug("Recovery reconnect failed, " +
-                            "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+                catch (IgniteCheckedException | IgniteException e) {
+                    try {
+                        if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
+                            if (log.isDebugEnabled())
+                                log.debug("Recovery reconnect failed, will retry " +
+                                    "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+                            addProcessDisconnectRequest(sesInfo);
+                        }
+                        else {
+                            if (log.isDebugEnabled())
+                                log.debug("Recovery reconnect failed, " +
+                                    "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
 
-                    onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
-                        e);
+                            onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
+                                e);
+                        }
+                    }
+                    catch (IgniteClientDisconnectedException e0) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to ping node, client disconnected.");
+                    }
                 }
             }
         }
 
         /**
-         * @param recoverySnd Recovery send data.
+         * @param sesInfo Disconnected session information.
          */
-        void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) {
-            boolean add = q.add(recoverySnd);
+        void addProcessDisconnectRequest(DisconnectedSessionInfo sesInfo) {
+            boolean add = q.add(sesInfo);
 
             assert add;
         }
@@ -3731,4 +3740,42 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             lock.readUnlock();
         }
     }
+
+    /**
+     *
+     */
+    private static class DisconnectedSessionInfo {
+        /** */
+        private final UUID nodeId;
+
+        /** */
+        private final GridNioSession ses;
+
+        /** */
+        private final GridNioRecoveryDescriptor recoveryDesc;
+
+        /** */
+        private final boolean reconnect;
+
+        /**
+         * @param nodeId Node ID.
+         * @param ses Session.
+         * @param recoveryDesc Recovery descriptor.
+         * @param reconnect Reconnect flag.
+         */
+        public DisconnectedSessionInfo(UUID nodeId,
+            GridNioSession ses,
+            @Nullable GridNioRecoveryDescriptor recoveryDesc,
+            boolean reconnect) {
+            this.nodeId = nodeId;
+            this.ses = ses;
+            this.recoveryDesc = recoveryDesc;
+            this.reconnect = reconnect;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DisconnectedSessionInfo.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89d722c2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
new file mode 100644
index 0000000..618fe2a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheMessageRecoveryIdleConnection extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 3;
+
+    /** */
+    private static final long IDLE_TIMEOUT = 50;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT);
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 2 * 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheOperationsIdleConnectionCloseTx() throws Exception {
+        cacheOperationsIdleConnectionClose(TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception {
+        cacheOperationsIdleConnectionClose(ATOMIC);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync();
+
+        try {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            int iter = 0;
+
+            long stopTime = System.currentTimeMillis() + 90_000;
+
+            while (System.currentTimeMillis() < stopTime) {
+                if (iter++ % 10 == 0)
+                    log.info("Iteration: " + iter);
+
+                cache.put(iter, 1);
+
+                IgniteFuture<?> fut = cache.future();
+
+                try {
+                    fut.get(10_000);
+                }
+                catch (IgniteException e) {
+                    List<Ignite> nodes = IgnitionEx.allGridsx();
+
+                    for (Ignite node : nodes)
+                        ((IgniteKernal)node).dumpDebugInfo();
+
+                    U.dumpThreads(log);
+
+                    throw e;
+                }
+
+                U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10));
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89d722c2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 003b12c..33aae9a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -123,6 +123,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp
 import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnection;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSystemTransactionsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxMessageRecoveryTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCrossCacheTxStoreSelfTest;
@@ -272,6 +273,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheMixedPartitionExchangeSelfTest.class);
         suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
         suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
+        suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnection.class);
         GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionAtomicSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredAtomicSelfTest.class, ignoredTests);