You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/29 18:09:21 UTC

incubator-ignite git commit: IGNITE-1169 Implement on TcpCommunicationSpi sendWithAck methods. Added tests.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1169 [created] 325458125


IGNITE-1169 Implement on TcpCommunicationSpi sendWithAck methods. Added tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/32545812
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/32545812
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/32545812

Branch: refs/heads/ignite-1169
Commit: 325458125fe5cb34c1cf79b3e7ece161b6cff05c
Parents: f82fb5c
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Jul 29 19:50:10 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Jul 29 19:09:00 2015 +0300

----------------------------------------------------------------------
 .../util/nio/GridCommunicationClient.java       |  11 +
 .../util/nio/GridNioRecoveryDescriptor.java     |  54 ++-
 .../util/nio/GridShmemCommunicationClient.java  |   8 +
 .../util/nio/GridTcpNioCommunicationClient.java |  32 ++
 .../communication/tcp/TcpCommunicationSpi.java  |  70 ++-
 ...mmunicationSpiRecoveryAckFutureSelfTest.java | 447 +++++++++++++++++++
 .../IgniteSpiCommunicationSelfTestSuite.java    |   1 +
 7 files changed, 619 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 693a5a4..0403272 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
@@ -100,6 +101,16 @@ public interface GridCommunicationClient {
     public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
 
     /**
+     * @param nodeId Node ID (provided only if versions of local and remote nodes are different).
+     * @param msg Message to send.
+     * @param fut Future which done when will be received ack on the message.
+     * @throws IgniteCheckedException If failed.
+     * @return {@code True} if should try to resend message.
+     */
+    public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg, GridFutureAdapter<Boolean> fut)
+        throws IgniteCheckedException;
+
+    /**
      * @return {@code True} if send is asynchronous.
      */
     public boolean async();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 733ae81..66ae60f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
@@ -36,6 +37,9 @@ public class GridNioRecoveryDescriptor {
     /** Unacknowledged message futures. */
     private final ArrayDeque<GridNioFuture<?>> msgFuts;
 
+    /** Unacknowledged message futures. */
+    private final Map<GridNioFuture<?>, GridFutureAdapter<Boolean>> ackFuts;
+
     /** Number of messages to resend. */
     private int resendCnt;
 
@@ -79,6 +83,7 @@ public class GridNioRecoveryDescriptor {
         assert queueLimit > 0;
 
         msgFuts = new ArrayDeque<>(queueLimit);
+        ackFuts = new HashMap<>(queueLimit);
 
         this.queueLimit = queueLimit;
         this.node = node;
@@ -166,6 +171,16 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
+     * @param nioFut fut NIO future.
+     * @param fut ack future.
+     */
+    public void add(GridNioFuture<?> nioFut, GridFutureAdapter<Boolean> fut) {
+        assert fut != null;
+
+        ackFuts.put(nioFut, fut);
+    }
+
+    /**
      * @param rcvCnt Number of messages received by remote node.
      */
     public void ackReceived(long rcvCnt) {
@@ -173,6 +188,8 @@ public class GridNioRecoveryDescriptor {
             log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt +
                 ", msgFuts=" + msgFuts.size() + ']');
 
+        GridFutureAdapter<Boolean> ackFut;
+
         while (acked < rcvCnt) {
             GridNioFuture<?> fut = msgFuts.pollFirst();
 
@@ -183,6 +200,12 @@ public class GridNioRecoveryDescriptor {
             assert fut.isDone() : fut;
 
             acked++;
+
+            if (!ackFuts.isEmpty() && (ackFut = ackFuts.get(fut)) != null) {
+                ackFut.onDone(true);
+
+                ackFuts.remove(fut);
+            }
         }
     }
 
@@ -191,6 +214,7 @@ public class GridNioRecoveryDescriptor {
      */
     public void onNodeLeft() {
         GridNioFuture<?>[] futs = null;
+        GridFutureAdapter<?>[] akFuts = null;
 
         synchronized (this) {
             nodeLeft = true;
@@ -200,10 +224,16 @@ public class GridNioRecoveryDescriptor {
 
                 msgFuts.clear();
             }
+
+            if (!reserved && !ackFuts.isEmpty()) {
+                akFuts = ackFuts.values().toArray(new GridFutureAdapter<?>[ackFuts.size()]);
+
+                ackFuts.clear();
+            }
         }
 
         if (futs != null)
-            completeOnNodeLeft(futs);
+            completeOnNodeLeft(futs, akFuts);
     }
 
     /**
@@ -214,6 +244,13 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
+     * @return Futures for unacknowledged messages.
+     */
+    public Collection<GridFutureAdapter<Boolean>> ackMessageFutures() {
+        return ackFuts.values();
+    }
+
+    /**
      * @param node Node.
      * @return {@code True} if node is not null and has the same order as initial remtoe node.
      */
@@ -278,6 +315,7 @@ public class GridNioRecoveryDescriptor {
      */
     public void release() {
         GridNioFuture<?>[] futs = null;
+        GridFutureAdapter<?>[] akFuts = null;
 
         synchronized (this) {
             connected = false;
@@ -302,10 +340,16 @@ public class GridNioRecoveryDescriptor {
 
                 msgFuts.clear();
             }
+
+            if (nodeLeft && !ackFuts.isEmpty()) {
+                akFuts = ackFuts.values().toArray(new GridFutureAdapter<?>[ackFuts.size()]);
+
+                ackFuts.clear();
+            }
         }
 
         if (futs != null)
-            completeOnNodeLeft(futs);
+            completeOnNodeLeft(futs, akFuts);
     }
 
     /**
@@ -356,10 +400,14 @@ public class GridNioRecoveryDescriptor {
 
     /**
      * @param futs Futures to complete.
+     * @param ackFuts Ack futures to complete.
      */
-    private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
+    private void completeOnNodeLeft(GridNioFuture<?>[] futs, GridFutureAdapter<?>[] ackFuts) {
         for (GridNioFuture<?> msg : futs)
             ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id()));
+
+        for (GridFutureAdapter<?> fut : ackFuts)
+            fut.onDone(new IOException("Failed to send message, node has left: " + node.id()));
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index e05c37a..134d271 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -135,6 +137,12 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
     }
 
     /** {@inheritDoc} */
+    @Override public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg,
+        GridFutureAdapter<Boolean> fut) throws IgniteCheckedException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index abad875..834371f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -123,6 +124,37 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
     }
 
     /** {@inheritDoc} */
+    @Override public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg,
+        GridFutureAdapter<Boolean> fut) throws IgniteCheckedException {
+        // Node ID is never provided in asynchronous send mode.
+        assert nodeId == null;
+
+        GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+
+        GridNioFuture<?> nioFut = ses.send(msg);
+
+        if (nioFut.isDone()) {
+            try {
+                nioFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
+
+                if (e.getCause() instanceof IOException)
+                    return true;
+                else
+                    throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
+            }
+        }
+
+        if (recovery != null)
+            recovery.add(nioFut, fut);
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean async() {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index a0acb5c..53c6ddf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1350,7 +1350,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (slowClientQueueLimit > 0 && msgQueueLimit > 0 && slowClientQueueLimit >= msgQueueLimit) {
             U.quietAndWarn(log, "Slow client queue limit is set to a value greater than message queue limit " +
                 "(slow client queue limit will have no effect) [msgQueueLimit=" + msgQueueLimit +
-                    ", slowClientQueueLimit=" + slowClientQueueLimit + ']');
+                ", slowClientQueueLimit=" + slowClientQueueLimit + ']');
         }
 
         registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
@@ -1749,6 +1749,73 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     *
+     * @param node
+     * @param msg
+     * @return
+     * @throws IgniteSpiException
+     */
+    public IgniteInternalFuture<Boolean> sendMessageWithAck(ClusterNode node, Message msg) throws IgniteSpiException {
+        assert node != null;
+        assert msg != null;
+
+        if (log.isTraceEnabled())
+            log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');
+
+        IgniteInternalFuture<Boolean> fut = null;
+
+        if (node.id().equals(getLocalNode().id())) {
+            notifyListener(node.id(), msg, NOOP);
+
+            fut = new GridFinishedFuture<>(true);
+        }
+        else {
+            GridCommunicationClient client = null;
+
+            try {
+                boolean retry;
+
+                do {
+                    client = reserveClient(node);
+
+                    UUID nodeId = null;
+
+                    if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
+                        nodeId = node.id();
+
+                    fut = new GridFutureAdapter<>();
+
+                    retry = client.sendMessageWithAck(nodeId, msg, (GridFutureAdapter)fut);
+
+                    client.release();
+
+                    client = null;
+
+                    if (!retry)
+                        sentMsgsCnt.increment();
+                    else {
+                        ClusterNode node0 = getSpiContext().node(node.id());
+
+                        if (node0 == null)
+                            throw new IgniteCheckedException("Failed to send message to remote node " +
+                                "(node has left the grid): " + node.id());
+                    }
+                }
+                while (retry);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
+            }
+            finally {
+                if (client != null && clients.remove(node.id(), client))
+                    client.forceClose();
+            }
+        }
+
+        return fut;
+    }
+
+    /**
      * Returns existing or just created client to node.
      *
      * @param node Node to which client should be open.
@@ -2086,6 +2153,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                             meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine);
                         }
+
                         if (recoveryDesc != null) {
                             recoveryDesc.onHandshake(rcvCnt);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
new file mode 100644
index 0000000..2c2ef2e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.*;
+import org.apache.ignite.testframework.junits.spi.*;
+
+import org.eclipse.jetty.util.*;
+
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
+public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
+    /** */
+    private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
+
+    /** */
+    protected static final List<TcpCommunicationSpi> spis = new ArrayList<>();
+
+    /** */
+    protected static final List<ClusterNode> nodes = new ArrayList<>();
+
+    /** */
+    private static final int SPI_CNT = 2;
+
+    /**
+     *
+     */
+    static {
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
+    }
+
+    /**
+     * Disable SPI auto-start.
+     */
+    public GridTcpCommunicationSpiRecoveryAckFutureSelfTest() {
+        super(false);
+    }
+
+    /** */
+    @SuppressWarnings({"deprecation"})
+    private class TestListener implements CommunicationListener<Message> {
+        /** */
+        private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>();
+
+        /** */
+        private AtomicInteger rcvCnt = new AtomicInteger();
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
+            info("Test listener received message: " + msg);
+
+            assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
+
+            GridTestMessage msg0 = (GridTestMessage)msg;
+
+            assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId()));
+
+            rcvCnt.incrementAndGet();
+
+            msgC.run();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onDisconnected(UUID nodeId) {
+            // No-op.
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAckOnIdle() throws Exception {
+        checkAck(10, 2000, 9);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAckOnCount() throws Exception {
+        checkAck(10, 60_000, 10);
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param msgPerIter Messages per iteration.
+     * @throws Exception If failed.
+     */
+    private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception {
+        createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT);
+
+        try {
+            TcpCommunicationSpi spi0 = spis.get(0);
+            TcpCommunicationSpi spi1 = spis.get(1);
+
+            ClusterNode node0 = nodes.get(0);
+            ClusterNode node1 = nodes.get(1);
+
+            int msgId = 0;
+
+            int expMsgs = 0;
+
+            for (int i = 0; i < 5; i++) {
+                info("Iteration: " + i);
+
+                List<IgniteInternalFuture<Boolean>> futs = new ArrayList<>();
+
+                for (int j = 0; j < msgPerIter; j++) {
+                    futs.add(spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0)));
+
+                    futs.add(spi1.sendMessageWithAck(node0, new GridTestMessage(node1.id(), ++msgId, 0)));
+                }
+
+                expMsgs += msgPerIter;
+
+                for (TcpCommunicationSpi spi : spis) {
+                    GridNioServer srv = U.field(spi, "nioSrvr");
+
+                    Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+                    assertFalse(sessions.isEmpty());
+
+                    boolean found = false;
+
+                    for (GridNioSession ses : sessions) {
+                        final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+
+                        if (recoveryDesc != null) {
+                            found = true;
+
+                            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                                @Override public boolean apply() {
+                                    return recoveryDesc.messagesFutures().isEmpty();
+                                }
+                            }, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() + 7000 :
+                                10_000);
+
+                            assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0,
+                                recoveryDesc.messagesFutures().size());
+
+                            assertEquals("Unexpected ack messages: " + recoveryDesc.ackMessageFutures(), 0,
+                                recoveryDesc.ackMessageFutures().size());
+
+                            break;
+                        }
+                    }
+
+                    assertTrue(found);
+                }
+
+                final int expMsgs0 = expMsgs;
+
+                for (TcpCommunicationSpi spi : spis) {
+                    final TestListener lsnr = (TestListener)spi.getListener();
+
+                    GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                        @Override
+                        public boolean apply() {
+                            return lsnr.rcvCnt.get() >= expMsgs0;
+                        }
+                    }, 5000);
+
+                    assertEquals(expMsgs, lsnr.rcvCnt.get());
+                }
+
+                for (IgniteInternalFuture<Boolean> f : futs)
+                    assert f.get();
+            }
+        }
+        finally {
+            stopSpis();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueOverflow() throws Exception {
+        for (int i = 0; i < 3; i++) {
+            try {
+                startSpis(5, 60_000, 10);
+
+                checkOverflow();
+
+                break;
+            }
+            catch (IgniteCheckedException e) {
+                if (e.hasCause(BindException.class)) {
+                    if (i < 2) {
+                        info("Got exception caused by BindException, will retry after delay: " + e);
+
+                        stopSpis();
+
+                        U.sleep(10_000);
+                    }
+                    else
+                        throw e;
+                }
+                else
+                    throw e;
+            }
+            finally {
+                stopSpis();
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkOverflow() throws Exception {
+        TcpCommunicationSpi spi0 = spis.get(0);
+        TcpCommunicationSpi spi1 = spis.get(1);
+
+        ClusterNode node0 = nodes.get(0);
+        ClusterNode node1 = nodes.get(1);
+
+        final GridNioServer srv1 = U.field(spi1, "nioSrvr");
+
+        int msgId = 0;
+
+        // Send message to establish connection.
+        spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+        // Prevent node1 from send
+        GridTestUtils.setFieldValue(srv1, "skipWrite", true);
+
+        final GridNioSession ses0 = communicationSession(spi0);
+
+        for (int i = 0; i < 150; i++)
+            spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+        // Wait when session is closed because of queue overflow.
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return ses0.closeTime() != 0;
+            }
+        }, 5000);
+
+        assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
+
+        GridTestUtils.setFieldValue(srv1, "skipWrite", false);
+
+        for (int i = 0; i < 100; i++)
+            spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+        final int expMsgs = 251;
+
+        final TestListener lsnr = (TestListener)spi1.getListener();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override
+            public boolean apply() {
+                return lsnr.rcvCnt.get() >= expMsgs;
+            }
+        }, 5000);
+
+        assertEquals(expMsgs, lsnr.rcvCnt.get());
+    }
+
+    /**
+     * @param spi SPI.
+     * @return Session.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
+        final GridNioServer srv = U.field(spi, "nioSrvr");
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override
+            public boolean apply() {
+                Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+                return !sessions.isEmpty();
+            }
+        }, 5000);
+
+        Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+        assertEquals(1, sessions.size());
+
+        return sessions.iterator().next();
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param queueLimit Message queue limit.
+     * @return SPI instance.
+     */
+    protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) {
+        TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+        spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+        spi.setIdleConnectionTimeout(idleTimeout);
+        spi.setTcpNoDelay(true);
+        spi.setAckSendThreshold(ackCnt);
+        spi.setMessageQueueLimit(queueLimit);
+        spi.setSharedMemoryPort(-1);
+
+        return spi;
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param queueLimit Message queue limit.
+     * @throws Exception If failed.
+     */
+    private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception {
+        spis.clear();
+        nodes.clear();
+        spiRsrcs.clear();
+
+        Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+
+        for (int i = 0; i < SPI_CNT; i++) {
+            TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
+
+            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
+            IgniteTestResources rsrcs = new IgniteTestResources();
+
+            GridTestNode node = new GridTestNode(rsrcs.getNodeId());
+
+            GridSpiTestContext ctx = initSpiContext();
+
+            ctx.setLocalNode(node);
+
+            spiRsrcs.add(rsrcs);
+
+            rsrcs.inject(spi);
+
+            spi.setListener(new TestListener());
+
+            node.setAttributes(spi.getNodeAttributes());
+
+            nodes.add(node);
+
+            spi.spiStart(getTestGridName() + (i + 1));
+
+            spis.add(spi);
+
+            spi.onContextInitialized(ctx);
+
+            ctxs.put(node, ctx);
+        }
+
+        // For each context set remote nodes.
+        for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) {
+            for (ClusterNode n : nodes) {
+                if (!n.equals(e.getKey()))
+                    e.getValue().remoteNodes().add(n);
+            }
+        }
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param queueLimit Message queue limit.
+     * @throws Exception If failed.
+     */
+    private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception {
+        for (int i = 0; i < 3; i++) {
+            try {
+                startSpis(ackCnt, idleTimeout, queueLimit);
+
+                break;
+            }
+            catch (IgniteCheckedException e) {
+                if (e.hasCause(BindException.class)) {
+                    if (i < 2) {
+                        info("Failed to start SPIs because of BindException, will retry after delay.");
+
+                        stopSpis();
+
+                        U.sleep(10_000);
+                    }
+                    else
+                        throw e;
+                }
+                else
+                    throw e;
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void stopSpis() throws Exception {
+        for (CommunicationSpi<Message> spi : spis) {
+            spi.onContextDestroyed();
+
+            spi.setListener(null);
+
+            spi.spiStop();
+        }
+
+        for (IgniteTestResources rsrcs : spiRsrcs)
+            rsrcs.stopThreads();
+
+        spis.clear();
+        nodes.clear();
+        spiRsrcs.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32545812/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index ff86bda..dcb8058 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -32,6 +32,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
         TestSuite suite = new TestSuite("Communication SPI Test Suite");
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckSelfTest.class));
+        suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckFutureSelfTest.class));
         suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySelfTest.class));
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiConcurrentConnectSelfTest.class));