You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/31 22:40:30 UTC

[1/8] incubator-ignite git commit: Squashed commit of the following:

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-104 6c1655fbe -> 6b52a234f


Squashed commit of the following:

commit f55a17f71ec97513a6968b1ea3c359bc6238cc5e
Author: Yakov Zhdanov <yz...@gridgain.com>
Date:   Fri Jul 31 13:32:32 2015 +0300

    review

commit 58ca345f622dbadfba7ef2d3dce850c4baa1f319
Merge: 5f921f6 7ed4d15
Author: Yakov Zhdanov <yz...@gridgain.com>
Date:   Fri Jul 31 13:24:51 2015 +0300

    Merge branches 'ignite-752-2' and 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-752-2

commit 5f921f62dd6563a88b2ecdde92a2b2ee8218ec95
Author: Denis Magda <dm...@gridgain.com>
Date:   Wed Jul 29 10:40:44 2015 +0300

    ignite-752-2: added info on the lowest failure detection timeout to the documentation

commit 55f0eb56967d2cc9bdf62c3fb665521a59ddaf33
Author: Denis Magda <dm...@gridgain.com>
Date:   Wed Jul 29 09:15:29 2015 +0300

    ignite-752-2: supported connection check frequency even for cases when failure timeout is ignored; performance optimizations


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/44072f80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/44072f80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/44072f80

Branch: refs/heads/ignite-104
Commit: 44072f806d8d14d716475a3665d0afdf004c6db2
Parents: 7ed4d15
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Jul 31 13:35:46 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Jul 31 13:35:46 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 42 +++++++++++---------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  2 +-
 2 files changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44072f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 547347c..47ba8e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1787,6 +1787,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Connection check frequency. */
         private long connCheckFreq;
 
+        /** Connection check threshold. */
+        private long connCheckThreshold;
+
         /**
          */
         protected RingMessageWorker() {
@@ -1799,19 +1802,22 @@ class ServerImpl extends TcpDiscoveryImpl {
          * Initializes connection check frequency. Used only when failure detection timeout is enabled.
          */
         private void initConnectionCheckFrequency() {
-            if (spi.failureDetectionTimeoutEnabled()) {
-                for (int i = 3; i > 0; i--) {
-                    connCheckFreq = spi.failureDetectionTimeout() / i;
-
-                    if (connCheckFreq > 0)
-                        break;
-                }
+            if (spi.failureDetectionTimeoutEnabled())
+                connCheckThreshold = spi.failureDetectionTimeout();
+            else
+                connCheckThreshold = Math.min(spi.getSocketTimeout(), spi.getHeartbeatFrequency());
 
-                assert connCheckFreq > 0;
+            for (int i = 3; i > 0; i--) {
+                connCheckFreq = connCheckThreshold / i;
 
-                if (log.isDebugEnabled())
-                    log.debug("Connection check frequency is calculated: " + connCheckFreq);
+                if (connCheckFreq > 10)
+                    break;
             }
+
+            assert connCheckFreq > 0;
+
+            if (log.isDebugEnabled())
+                log.debug("Connection check frequency is calculated: " + connCheckFreq);
         }
 
         /**
@@ -2306,9 +2312,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             // If node existed on connection initialization we should check
                             // whether it has not gone yet.
-                            if (nextNodeExists && pingNode(next))
-                                U.error(log, "Failed to send message to next node [msg=" + msg +
-                                    ", next=" + next + ']', err);
+                            if (nextNodeExists)
+                                U.warn(log, "Failed to send message to next node [msg=" + msg + ", next=" + next +
+                                    ", errMsg=" + (err != null ? err.getMessage() : "N/A") + ']');
                             else if (log.isDebugEnabled())
                                 log.debug("Failed to send message to next node [msg=" + msg + ", next=" + next +
                                     ", errMsg=" + (err != null ? err.getMessage() : "N/A") + ']');
@@ -4025,7 +4031,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /**
          * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
-         * {@link TcpDiscoveryStatusCheckMessage} is sent accros the ring.
+         * {@link TcpDiscoveryStatusCheckMessage} is sent across the ring.
          */
         private void checkHeartbeatsReceiving() {
             if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
@@ -4045,11 +4051,9 @@ class ServerImpl extends TcpDiscoveryImpl {
          * Check connection aliveness status.
          */
         private void checkConnection() {
-            if (!spi.failureDetectionTimeoutEnabled())
-                return;
-
-            if (!failureThresholdReached && U.currentTimeMillis() - locNode.lastDataReceivedTime()
-                >= spi.failureDetectionTimeout() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+            if (spi.failureDetectionTimeoutEnabled() && !failureThresholdReached &&
+                U.currentTimeMillis() - locNode.lastDataReceivedTime() >= connCheckThreshold &&
+                ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
 
                 log.info("Local node seems to be disconnected from topology (failure detection timeout " +
                     "is reached): [failureDetectionTimeout=" + spi.failureDetectionTimeout() +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44072f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 09690dc..3216166 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -74,7 +74,7 @@ import java.util.concurrent.atomic.*;
  * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the
  * following parameters: {@link #getSocketTimeout()}, {@link #getAckTimeout()}, {@link #getMaxAckTimeout()},
  * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be
- * ignored.
+ * ignored. As an example, for stable low-latency networks the failure detection timeout may be set to ~120 ms.
  * <p>
  * If it's required to perform advanced settings of failure detection and
  * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpDiscoverySpi}


[3/8] incubator-ignite git commit: Squashed commit of the following:

Posted by vk...@apache.org.
Squashed commit of the following:

commit ed8dac68bb008c17246ecea5169b34a55b860869
Merge: 6f915db a127756
Author: Denis Magda <dm...@gridgain.com>
Date:   Mon Jul 27 16:56:39 2015 +0300

    Merge remote-tracking branch 'remotes/origin/master' into ignite-1139

commit 6f915db1890c81af035984f07a7195da9048a67f
Author: Denis Magda <dm...@gridgain.com>
Date:   Mon Jul 27 09:50:53 2015 +0300

    ignite-1139: uncommented tests

commit aadbdda1dab5e1c350afb0ac5e7f1182095ecd70
Author: Denis Magda <dm...@gridgain.com>
Date:   Mon Jul 27 09:30:50 2015 +0300

    ignite-1139: set cancel to true when stopping a client node

commit 86c6f6a8df6e828e5cc3c606c334925e948dee7a
Author: Denis Magda <dm...@gridgain.com>
Date:   Mon Jul 27 09:06:49 2015 +0300

    ignite-1139: temporaly disable some SPI tests

commit e6a2d88063a1c32478f3ee1dea80c2ffe2ee19af
Author: Denis Magda <dm...@gridgain.com>
Date:   Mon Jul 27 08:51:51 2015 +0300

    ignite-

commit f39086536e3afd031ed158e9cd2d65afb71a32bf
Merge: 14ee9df 84f8b95
Author: Denis Magda <dm...@gridgain.com>
Date:   Mon Jul 27 08:42:28 2015 +0300

    Merge branch 'ignite-1139' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-1139

commit 14ee9df2251716d1a3913742ce05154e2e958b56
Merge: fd6b0e3 0341759
Author: Denis Magda <dm...@gridgain.com>
Date:   Mon Jul 27 08:39:31 2015 +0300

    Merge remote-tracking branch 'remotes/origin/master' into ignite-1139

commit 84f8b956e40ae88d11e0ef125442203a497b8c4b
Author: dmagda <ma...@gmail.com>
Date:   Fri Jul 24 13:35:32 2015 +0300

    ignite-1139:
    - fixed race in GridDhtPartitionsExchangeFuture
    - fixed NPE in TcpCommunicationSpi when this SPI was not in the fully initialized state

commit 89da409d5e6a62e744c4030475bbbfcb822a103c
Merge: fd6b0e3 ed5d3ed
Author: dmagda <ma...@gmail.com>
Date:   Fri Jul 24 08:55:26 2015 +0300

    Merge remote-tracking branch 'remotes/origin/master' into ignite-1139

commit fd6b0e3684df97875947c7864487b658ac599fce
Author: Denis Magda <dm...@gridgain.com>
Date:   Thu Jul 23 16:08:21 2015 +0300

    ignite-1139: unmuted test


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/271550fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/271550fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/271550fe

Branch: refs/heads/ignite-104
Commit: 271550fed7662c5032f9e4fb49cd135f3a55a46e
Parents: abb2cef
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Jul 31 13:49:08 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Jul 31 13:49:08 2015 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        | 20 +++++-----
 .../communication/tcp/TcpCommunicationSpi.java  | 41 +++++++++++++++++---
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |  8 ++--
 3 files changed, 50 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/271550fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3664220..cbf6b40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -583,7 +583,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                             onDone(exchId.topologyVersion());
                         }
                         else
-                            sendPartitions();
+                            sendPartitions(oldest);
                     }
                     else {
                         rmtIds = Collections.emptyList();
@@ -816,9 +816,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (log.isDebugEnabled())
                 log.debug("Initialized future: " + this);
 
+            ClusterNode oldest = oldestNode.get();
+
             // If this node is not oldest.
-            if (!oldestNode.get().id().equals(cctx.localNodeId()))
-                sendPartitions();
+            if (!oldest.id().equals(cctx.localNodeId()))
+                sendPartitions(oldest);
             else {
                 boolean allReceived = allReceived();
 
@@ -948,11 +950,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     *
+     * @param oldestNode Oldest node.
      */
-    private void sendPartitions() {
-        ClusterNode oldestNode = this.oldestNode.get();
-
+    private void sendPartitions(ClusterNode oldestNode) {
         try {
             sendLocalPartitions(oldestNode, exchId);
         }
@@ -1402,8 +1402,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      *
      */
     private void recheck() {
+        ClusterNode oldest = oldestNode.get();
+
         // If this is the oldest node.
-        if (oldestNode.get().id().equals(cctx.localNodeId())) {
+        if (oldest.id().equals(cctx.localNodeId())) {
             Collection<UUID> remaining = remaining();
 
             if (!remaining.isEmpty()) {
@@ -1423,7 +1425,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
         }
         else
-            sendPartitions();
+            sendPartitions(oldest);
 
         // Schedule another send.
         scheduleRecheck();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/271550fe/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index f76025d..1c74d59 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1791,7 +1791,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (log.isTraceEnabled())
             log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
 
-        if (node.id().equals(getLocalNode().id()))
+        ClusterNode localNode = getLocalNode();
+
+        if (localNode == null)
+            throw new IgniteSpiException("Local node has not been started or fully initialized " +
+                "[isStopping=" + getSpiContext().isStopping() + ']');
+
+        if (node.id().equals(localNode.id()))
             notifyListener(node.id(), msg, NOOP);
         else {
             GridCommunicationClient client = null;
@@ -1804,7 +1810,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     UUID nodeId = null;
 
-                    if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
+                    if (!client.async() && !localNode.version().equals(node.version()))
                         nodeId = node.id();
 
                     retry = client.sendMessage(nodeId, msg);
@@ -2435,8 +2441,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     else
                         ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
+                    ClusterNode localNode = getLocalNode();
+
+                    if (localNode == null)
+                        throw new IgniteCheckedException("Local node has not been started or " +
+                            "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+
                     if (recovery != null) {
-                        HandshakeMessage msg = new HandshakeMessage(getLocalNode().id(),
+                        HandshakeMessage msg = new HandshakeMessage(localNode.id(),
                             recovery.incrementConnectCount(),
                             recovery.receivedCount());
 
@@ -2629,7 +2641,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return Node ID message.
      */
     private NodeIdMessage nodeIdMessage() {
-        return new NodeIdMessage(getLocalNode().id());
+        ClusterNode localNode = getLocalNode();
+
+        UUID id;
+
+        if (localNode == null) {
+            U.warn(log, "Local node is not started or fully initialized [isStopping=" +
+                    getSpiContext().isStopping() + ']');
+
+            id = new UUID(0, 0);
+        }
+        else
+            id = localNode.id();
+
+        return new NodeIdMessage(id);
     }
 
     /** {@inheritDoc} */
@@ -3145,7 +3170,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                UUID id = getLocalNode().id();
+                ClusterNode localNode = getLocalNode();
+
+                if (localNode == null)
+                    throw new IgniteSpiException("Local node has not been started or fully initialized " +
+                        "[isStopping=" + getSpiContext().isStopping() + ']');
+
+                UUID id = localNode.id();
 
                 NodeIdMessage msg = new NodeIdMessage(id);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/271550fe/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 69dd538..f7c73b6 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -88,9 +88,9 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
         stopAllGrids();
+
+        super.afterTest();
     }
 
     /** {@inheritDoc} */
@@ -102,8 +102,6 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
      * @throws Exception If any error occurs.
      */
     public void testMultiThreadedClientsRestart() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1139");
-
         clientFlagGlobal = false;
 
         info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
@@ -126,7 +124,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
                     int idx = clientIdx.getAndIncrement();
 
                     while (!done.get()) {
-                        stopGrid(idx);
+                        stopGrid(idx, true);
                         startGrid(idx);
                     }
 


[8/8] incubator-ignite git commit: Merge branches 'ignite-104' and 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104

Posted by vk...@apache.org.
Merge branches 'ignite-104' and 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104

Conflicts:
	modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6b52a234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6b52a234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6b52a234

Branch: refs/heads/ignite-104
Commit: 6b52a234f8c8f845a088df225220d6dde30a5e3f
Parents: 5cdd244
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Fri Jul 31 13:40:21 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Fri Jul 31 13:40:21 2015 -0700

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java    | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b52a234/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 479d116..d125eef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1227,7 +1227,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
         IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure);
+        send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false, ackClosure);
     }
 
     /**
@@ -1250,7 +1250,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(nodes, topic, -1, msg, plc, true, false, timeout, skipOnTimeout);
+        send(nodes, topic, -1, msg, plc, true, false, timeout, skipOnTimeout, null);
     }
 
     /**
@@ -1263,7 +1263,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackClosure)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, ackClosure);
+        send(node, topic, -1, msg, plc, false, false, 0, false, ackClosure);
     }
 
     /**
@@ -1279,7 +1279,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Message msg,
         byte plc
     ) throws IgniteCheckedException {
-        send(nodes, topic, -1, msg, plc, false, false, 0, false);
+        send(nodes, topic, -1, msg, plc, false, false, 0, false, null);
     }
 
     /**
@@ -1295,7 +1295,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Message msg,
         byte plc
     ) throws IgniteCheckedException {
-        send(nodes, topic, topic.ordinal(), msg, plc, false, false, 0, false);
+        send(nodes, topic, topic.ordinal(), msg, plc, false, false, 0, false, null);
     }
 
     /**
@@ -1319,7 +1319,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+        send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout, ackClosure);
     }
 
      /**
@@ -1508,7 +1508,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+        send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout, ackClosure);
     }
 
     /**
@@ -1532,7 +1532,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         boolean ordered,
         boolean seq,
         long timeout,
-        boolean skipOnTimeout
+        boolean skipOnTimeout,
+        IgniteInClosure<IgniteException> ackClosure
     ) throws IgniteCheckedException {
         assert nodes != null;
         assert topic != null;
@@ -1547,7 +1548,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             // messages to one node vs. many.
             if (!nodes.isEmpty()) {
                 for (ClusterNode node : nodes)
-                    send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout, null);
+                    send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout, ackClosure);
             }
             else if (log.isDebugEnabled())
                 log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +


[6/8] incubator-ignite git commit: IGNITE-1169 Implemented send with ack methods on TcpCommunication and GridIoManager. Added tests.

Posted by vk...@apache.org.
IGNITE-1169 Implemented send with ack methods on TcpCommunication and GridIoManager. Added tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1c10ade5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1c10ade5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1c10ade5

Branch: refs/heads/ignite-104
Commit: 1c10ade5a50c505ef5ed574ae7001ef7e779cf2e
Parents: aec9764
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Fri Jul 31 16:34:24 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 31 16:34:53 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 108 ++++-
 .../util/nio/GridCommunicationClient.java       |   5 +-
 .../util/nio/GridNioFinishedFuture.java         |  12 +
 .../ignite/internal/util/nio/GridNioFuture.java |  14 +
 .../internal/util/nio/GridNioFutureImpl.java    |  15 +
 .../util/nio/GridNioRecoveryDescriptor.java     |  13 +-
 .../ignite/internal/util/nio/GridNioServer.java |   5 +
 .../util/nio/GridNioSessionMetaKey.java         |   5 +-
 .../util/nio/GridShmemCommunicationClient.java  |   7 +-
 .../util/nio/GridTcpNioCommunicationClient.java |  14 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  43 +-
 ...CommunicationRecoveryAckClosureSelfTest.java | 464 +++++++++++++++++++
 .../IgniteSpiCommunicationSelfTestSuite.java    |   1 +
 13 files changed, 685 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index c1fb79a..7e17efc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -36,6 +36,7 @@ import org.apache.ignite.marshaller.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.thread.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
@@ -971,6 +972,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param ordered Ordered flag.
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
+     * @param ackClosure Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private void send(
@@ -981,7 +983,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         boolean ordered,
         long timeout,
-        boolean skipOnTimeout
+        boolean skipOnTimeout,
+        IgniteInClosure<IgniteException> ackClosure
     ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
@@ -1001,13 +1004,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
             else
                 processRegularMessage0(ioMsg, locNodeId);
+
+            if (ackClosure != null)
+                ackClosure.apply(null);
         }
         else {
             if (topicOrd < 0)
                 ioMsg.topicBytes(marsh.marshal(topic));
 
             try {
-                getSpi().sendMessage(node, ioMsg);
+                if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
+                    ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure);
+                else
+                    getSpi().sendMessage(node, ioMsg);
             }
             catch (IgniteSpiException e) {
                 throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
@@ -1050,7 +1059,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
     }
 
     /**
@@ -1062,7 +1071,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false);
+        send(node, topic, -1, msg, plc, false, 0, false, null);
     }
 
     /**
@@ -1074,7 +1083,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
     }
 
     /**
@@ -1096,7 +1105,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
     }
 
     /**
@@ -1123,11 +1132,24 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
     }
 
     /**
-     * @param nodes Destination nodes.
+     * @param node Destination nodes.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @param ackClosure Ack closure.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
+        IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException {
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure);
+    }
+
+    /**
+     * @param nodes Destination node.
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
@@ -1150,7 +1172,20 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param nodes Destination nodes.
+     * @param node Destination nodes.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @param ackClosure Ack closure.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackClosure)
+        throws IgniteCheckedException {
+        send(node, topic, -1, msg, plc, false, 0, false, ackClosure);
+    }
+
+    /**
+     * @param nodes Destination node.
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
@@ -1182,6 +1217,30 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * @param node Destination node.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @param timeout Timeout to keep a message on receiving queue.
+     * @param skipOnTimeout Whether message can be skipped on timeout.
+     * @param ackClosure Ack closure.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void sendOrderedMessage(
+        ClusterNode node,
+        Object topic,
+        Message msg,
+        byte plc,
+        long timeout,
+        boolean skipOnTimeout,
+        IgniteInClosure<IgniteException> ackClosure
+    ) throws IgniteCheckedException {
+        assert timeout > 0 || skipOnTimeout;
+
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+    }
+
+     /**
      * Sends a peer deployable user message.
      *
      * @param nodes Destination nodes.
@@ -1301,6 +1360,35 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * @param nodeId Destination node.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @param timeout Timeout to keep a message on receiving queue.
+     * @param skipOnTimeout Whether message can be skipped on timeout.
+     * @param ackClosure Ack closure.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void sendOrderedMessage(
+        UUID nodeId,
+        Object topic,
+        Message msg,
+        byte plc,
+        long timeout,
+        boolean skipOnTimeout,
+        IgniteInClosure<IgniteException> ackClosure
+    ) throws IgniteCheckedException {
+        assert timeout > 0 || skipOnTimeout;
+
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null)
+            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+    }
+
+    /**
      * @param nodes Destination nodes.
      * @param topic Topic to send the message to.
      * @param topicOrd Topic ordinal value.
@@ -1334,7 +1422,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             // messages to one node vs. many.
             if (!nodes.isEmpty()) {
                 for (ClusterNode node : nodes)
-                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout);
+                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
             }
             else if (log.isDebugEnabled())
                 log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 693a5a4..1a26ad5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
@@ -94,10 +95,12 @@ public interface GridCommunicationClient {
     /**
      * @param nodeId Node ID (provided only if versions of local and remote nodes are different).
      * @param msg Message to send.
+     * @param closure Ack closure.
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if should try to resend message.
      */
-    public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
+    public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> closure)
+        throws IgniteCheckedException;
 
     /**
      * @return {@code True} if send is asynchronous.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
index 9029dd2..aac238a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 
 /**
  * Future that represents already completed result.
@@ -57,6 +59,16 @@ public class GridNioFinishedFuture<R> extends GridFinishedFuture<R> implements G
     }
 
     /** {@inheritDoc} */
+    @Override public void ackClosure(IgniteInClosure<IgniteException> closure) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInClosure<IgniteException> ackClosure() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNioFinishedFuture.class, this, super.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
index 7101f45..5a884f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
 
 /**
  * NIO future.
@@ -39,4 +41,16 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> {
      * @return {@code True} if skip recovery for this operation.
      */
     public boolean skipRecovery();
+
+    /**
+     * Sets ack closure which will be applied when ack recevied.
+     *
+     * @param closure Ack closure.
+     */
+    public void ackClosure(IgniteInClosure<IgniteException> closure);
+
+    /**
+     * @return Ack closure.
+     */
+    public IgniteInClosure<IgniteException> ackClosure();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
index c5393c4..e71bf92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 
 /**
  * Default future implementation.
@@ -30,6 +32,9 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi
     /** */
     protected boolean msgThread;
 
+    /** */
+    protected IgniteInClosure<IgniteException> ackClosure;
+
     /** {@inheritDoc} */
     @Override public void messageThread(boolean msgThread) {
         this.msgThread = msgThread;
@@ -46,6 +51,16 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi
     }
 
     /** {@inheritDoc} */
+    @Override public void ackClosure(IgniteInClosure<IgniteException> closure) {
+        ackClosure = closure;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInClosure<IgniteException> ackClosure() {
+        return ackClosure;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNioFutureImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 733ae81..a7ed02a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -182,6 +182,9 @@ public class GridNioRecoveryDescriptor {
 
             assert fut.isDone() : fut;
 
+            if (fut.ackClosure() != null)
+                fut.ackClosure().apply(null);
+
             acked++;
         }
     }
@@ -358,8 +361,14 @@ public class GridNioRecoveryDescriptor {
      * @param futs Futures to complete.
      */
     private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
-        for (GridNioFuture<?> msg : futs)
-            ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id()));
+        for (GridNioFuture<?> msg : futs) {
+            IOException e = new IOException("Failed to send message, node has left: " + node.id());
+
+            ((GridNioFutureImpl)msg).onDone(e);
+
+            if (msg.ackClosure() != null)
+                msg.ackClosure().apply(new IgniteException(e));
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index ed55101..c180837 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -394,6 +394,11 @@ public class GridNioServer<T> {
 
         int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
 
+        IgniteInClosure<IgniteException> ackClosure;
+
+        if (!sys && (ackClosure = ses.removeMeta(ACK_CLOSURE.ordinal())) != null)
+            fut.ackClosure(ackClosure);
+
         if (ses.closed()) {
             if (ses.removeFuture(fut))
                 fut.connectionClosed();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
index 004c327..23c1e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
@@ -45,7 +45,10 @@ public enum GridNioSessionMetaKey {
     MSG_WRITER,
 
     /** SSL engine. */
-    SSL_ENGINE;
+    SSL_ENGINE,
+
+    /** Ack closure. */
+    ACK_CLOSURE;
 
     /** Maximum count of NIO session keys in system. */
     public static final int MAX_KEYS_CNT = 64;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index e05c37a..67d4664 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
@@ -113,7 +114,8 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg)
+    @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg,
+        IgniteInClosure<IgniteException> closure)
         throws IgniteCheckedException {
         if (closed())
             throw new IgniteCheckedException("Communication client was closed: " + this);
@@ -131,6 +133,9 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
 
         markUsed();
 
+        if (closure != null)
+            closure.apply(null);
+
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index abad875..7933001 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.util.nio;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
@@ -27,6 +28,8 @@ import java.io.*;
 import java.nio.*;
 import java.util.*;
 
+import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*;
+
 /**
  * Grid client for NIO server.
  */
@@ -97,11 +100,14 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
     }
 
     /** {@inheritDoc} */
-    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg)
+    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> closure)
         throws IgniteCheckedException {
         // Node ID is never provided in asynchronous send mode.
         assert nodeId == null;
 
+        if (closure != null)
+            ses.addMeta(ACK_CLOSURE.ordinal(), closure);
+
         GridNioFuture<?> fut = ses.send(msg);
 
         if (fut.isDone()) {
@@ -109,6 +115,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
                 fut.get();
             }
             catch (IgniteCheckedException e) {
+                if (closure != null)
+                    ses.removeMeta(ACK_CLOSURE.ordinal());
+
                 if (log.isDebugEnabled())
                     log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
 
@@ -119,6 +128,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
             }
         }
 
+        if (closure != null)
+            ses.removeMeta(ACK_CLOSURE.ordinal());
+
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1c74d59..b706edf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1620,6 +1620,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /**
      * Creates new shared memory communication server.
+     *
      * @return Server.
      * @throws IgniteCheckedException If failed.
      */
@@ -1785,11 +1786,41 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+        sendMessage0(node, msg, null);
+    }
+
+    /**
+     * Sends given message to destination node. Note that characteristics of the
+     * exchange such as durability, guaranteed delivery or error notification is
+     * dependant on SPI implementation.
+     *
+     * @param node Destination node.
+     * @param msg Message to send.
+     * @param ackClosure Ack closure.
+     * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
+     *      Note that this is not guaranteed that failed communication will result
+     *      in thrown exception as this is dependant on SPI implementation.
+     */
+    public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+        throws IgniteSpiException {
+        sendMessage0(node, msg, ackClosure);
+    }
+
+    /**
+     * @param node Destination node.
+     * @param msg Message to send.
+     * @param ackClosure Ack closure.
+     * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
+     *      Note that this is not guaranteed that failed communication will result
+     *      in thrown exception as this is dependant on SPI implementation.
+     */
+    private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+        throws IgniteSpiException {
         assert node != null;
         assert msg != null;
 
         if (log.isTraceEnabled())
-            log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
+            log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');
 
         ClusterNode localNode = getLocalNode();
 
@@ -1813,7 +1844,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (!client.async() && !localNode.version().equals(node.version()))
                         nodeId = node.id();
 
-                    retry = client.sendMessage(nodeId, msg);
+                    retry = client.sendMessage(nodeId, msg, ackClosure);
 
                     client.release();
 
@@ -1876,7 +1907,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 GridCommunicationClient old = clients.put(nodeId, client0);
 
                                 assert old == null : "Client already created " +
-                                        "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']';
+                                    "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']';
 
                                 if (client0 instanceof GridTcpNioCommunicationClient) {
                                     GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
@@ -1979,7 +2010,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException {
+    @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node,
+        Integer port) throws IgniteCheckedException {
         int attempt = 1;
 
         int connectAttempts = 1;
@@ -2204,6 +2236,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                             meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine);
                         }
+
                         if (recoveryDesc != null) {
                             recoveryDesc.onHandshake(rcvCnt);
 
@@ -2433,7 +2466,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     else if (log.isDebugEnabled())
                         log.debug("Received remote node ID: " + rmtNodeId0);
 
-                    if (isSslEnabled() ) {
+                    if (isSslEnabled()) {
                         assert sslHnd != null;
 
                         ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
new file mode 100644
index 0000000..e353f2d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.*;
+import org.apache.ignite.testframework.junits.spi.*;
+
+import org.eclipse.jetty.util.*;
+
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi>
+    extends GridSpiAbstractTest<T> {
+    /** */
+    private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
+
+    /** */
+    protected static final List<TcpCommunicationSpi> spis = new ArrayList<>();
+
+    /** */
+    protected static final List<ClusterNode> nodes = new ArrayList<>();
+
+    /** */
+    private static final int SPI_CNT = 2;
+
+    /**
+     *
+     */
+    static {
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
+    }
+
+    /**
+     * Disable SPI auto-start.
+     */
+    public IgniteTcpCommunicationRecoveryAckClosureSelfTest() {
+        super(false);
+    }
+
+    /** */
+    @SuppressWarnings({"deprecation"})
+    private class TestListener implements CommunicationListener<Message> {
+        /** */
+        private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>();
+
+        /** */
+        private AtomicInteger rcvCnt = new AtomicInteger();
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
+            info("Test listener received message: " + msg);
+
+            assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
+
+            GridTestMessage msg0 = (GridTestMessage)msg;
+
+            assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId()));
+
+            rcvCnt.incrementAndGet();
+
+            msgC.run();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onDisconnected(UUID nodeId) {
+            // No-op.
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAckOnIdle() throws Exception {
+        checkAck(10, 2000, 9);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAckOnCount() throws Exception {
+        checkAck(10, 60_000, 10);
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param msgPerIter Messages per iteration.
+     * @throws Exception If failed.
+     */
+    private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception {
+        createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT);
+
+        try {
+            TcpCommunicationSpi spi0 = spis.get(0);
+            TcpCommunicationSpi spi1 = spis.get(1);
+
+            ClusterNode node0 = nodes.get(0);
+            ClusterNode node1 = nodes.get(1);
+
+            int msgId = 0;
+
+            int expMsgs = 0;
+
+            for (int i = 0; i < 5; i++) {
+                info("Iteration: " + i);
+
+                final AtomicInteger ackMsgs = new AtomicInteger(0);
+
+                IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() {
+                    @Override public void apply(IgniteException o) {
+                        assert o == null;
+
+                        ackMsgs.incrementAndGet();
+                    }
+                };
+
+                for (int j = 0; j < msgPerIter; j++) {
+                    spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+
+                    spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure);
+                }
+
+                expMsgs += msgPerIter;
+
+                for (TcpCommunicationSpi spi : spis) {
+                    GridNioServer srv = U.field(spi, "nioSrvr");
+
+                    Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+                    assertFalse(sessions.isEmpty());
+
+                    boolean found = false;
+
+                    for (GridNioSession ses : sessions) {
+                        final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+
+                        if (recoveryDesc != null) {
+                            found = true;
+
+                            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                                @Override public boolean apply() {
+                                    return recoveryDesc.messagesFutures().isEmpty();
+                                }
+                            }, 10_000);
+
+                            assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0,
+                                recoveryDesc.messagesFutures().size());
+
+                            break;
+                        }
+                    }
+
+                    assertTrue(found);
+                }
+
+                final int expMsgs0 = expMsgs;
+
+                for (TcpCommunicationSpi spi : spis) {
+                    final TestListener lsnr = (TestListener)spi.getListener();
+
+                    GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                        @Override
+                        public boolean apply() {
+                            return lsnr.rcvCnt.get() >= expMsgs0;
+                        }
+                    }, 5000);
+
+                    assertEquals(expMsgs, lsnr.rcvCnt.get());
+                }
+
+                assertEquals(msgPerIter * 2, ackMsgs.get());
+            }
+        }
+        finally {
+            stopSpis();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueOverflow() throws Exception {
+        for (int i = 0; i < 3; i++) {
+            try {
+                startSpis(5, 60_000, 10);
+
+                checkOverflow();
+
+                break;
+            }
+            catch (IgniteCheckedException e) {
+                if (e.hasCause(BindException.class)) {
+                    if (i < 2) {
+                        info("Got exception caused by BindException, will retry after delay: " + e);
+
+                        stopSpis();
+
+                        U.sleep(10_000);
+                    }
+                    else
+                        throw e;
+                }
+                else
+                    throw e;
+            }
+            finally {
+                stopSpis();
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkOverflow() throws Exception {
+        TcpCommunicationSpi spi0 = spis.get(0);
+        TcpCommunicationSpi spi1 = spis.get(1);
+
+        ClusterNode node0 = nodes.get(0);
+        ClusterNode node1 = nodes.get(1);
+
+        final GridNioServer srv1 = U.field(spi1, "nioSrvr");
+
+        final AtomicInteger ackMsgs = new AtomicInteger(0);
+
+        IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() {
+            @Override public void apply(IgniteException o) {
+                assert o == null;
+
+                ackMsgs.incrementAndGet();
+            }
+        };
+
+        int msgId = 0;
+
+        // Send message to establish connection.
+        spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+
+        // Prevent node1 from send
+        GridTestUtils.setFieldValue(srv1, "skipWrite", true);
+
+        final GridNioSession ses0 = communicationSession(spi0);
+
+        for (int i = 0; i < 150; i++)
+            spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+
+        // Wait when session is closed because of queue overflow.
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return ses0.closeTime() != 0;
+            }
+        }, 5000);
+
+        assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
+
+        GridTestUtils.setFieldValue(srv1, "skipWrite", false);
+
+        for (int i = 0; i < 100; i++)
+            spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
+
+        final int expMsgs = 251;
+
+        final TestListener lsnr = (TestListener)spi1.getListener();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return lsnr.rcvCnt.get() >= expMsgs;
+            }
+        }, 5000);
+
+        assertEquals(expMsgs, lsnr.rcvCnt.get());
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return expMsgs == ackMsgs.get();
+            }
+        }, 5000);
+    }
+
+    /**
+     * @param spi SPI.
+     * @return Session.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
+        final GridNioServer srv = U.field(spi, "nioSrvr");
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override
+            public boolean apply() {
+                Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+                return !sessions.isEmpty();
+            }
+        }, 5000);
+
+        Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+        assertEquals(1, sessions.size());
+
+        return sessions.iterator().next();
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param queueLimit Message queue limit.
+     * @return SPI instance.
+     */
+    protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) {
+        TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+        spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+        spi.setIdleConnectionTimeout(idleTimeout);
+        spi.setTcpNoDelay(true);
+        spi.setAckSendThreshold(ackCnt);
+        spi.setMessageQueueLimit(queueLimit);
+        spi.setSharedMemoryPort(-1);
+
+        return spi;
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param queueLimit Message queue limit.
+     * @throws Exception If failed.
+     */
+    private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception {
+        spis.clear();
+        nodes.clear();
+        spiRsrcs.clear();
+
+        Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+
+        for (int i = 0; i < SPI_CNT; i++) {
+            TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
+
+            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
+            IgniteTestResources rsrcs = new IgniteTestResources();
+
+            GridTestNode node = new GridTestNode(rsrcs.getNodeId());
+
+            GridSpiTestContext ctx = initSpiContext();
+
+            ctx.setLocalNode(node);
+
+            spiRsrcs.add(rsrcs);
+
+            rsrcs.inject(spi);
+
+            spi.setListener(new TestListener());
+
+            node.setAttributes(spi.getNodeAttributes());
+
+            nodes.add(node);
+
+            spi.spiStart(getTestGridName() + (i + 1));
+
+            spis.add(spi);
+
+            spi.onContextInitialized(ctx);
+
+            ctxs.put(node, ctx);
+        }
+
+        // For each context set remote nodes.
+        for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) {
+            for (ClusterNode n : nodes) {
+                if (!n.equals(e.getKey()))
+                    e.getValue().remoteNodes().add(n);
+            }
+        }
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param queueLimit Message queue limit.
+     * @throws Exception If failed.
+     */
+    private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception {
+        for (int i = 0; i < 3; i++) {
+            try {
+                startSpis(ackCnt, idleTimeout, queueLimit);
+
+                break;
+            }
+            catch (IgniteCheckedException e) {
+                if (e.hasCause(BindException.class)) {
+                    if (i < 2) {
+                        info("Failed to start SPIs because of BindException, will retry after delay.");
+
+                        stopSpis();
+
+                        U.sleep(10_000);
+                    }
+                    else
+                        throw e;
+                }
+                else
+                    throw e;
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void stopSpis() throws Exception {
+        for (CommunicationSpi<Message> spi : spis) {
+            spi.onContextDestroyed();
+
+            spi.setListener(null);
+
+            spi.spiStop();
+        }
+
+        for (IgniteTestResources rsrcs : spiRsrcs)
+            rsrcs.stopThreads();
+
+        spis.clear();
+        nodes.clear();
+        spiRsrcs.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 3f71d7d..9b43204 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -32,6 +32,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
         TestSuite suite = new TestSuite("Communication SPI Test Suite");
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckSelfTest.class));
+        suite.addTest(new TestSuite(IgniteTcpCommunicationRecoveryAckClosureSelfTest.class));
         suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySelfTest.class));
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiConcurrentConnectSelfTest.class));



[7/8] incubator-ignite git commit: Merge branches 'ignite-104' and 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104

Posted by vk...@apache.org.
Merge branches 'ignite-104' and 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104

Conflicts:
	modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cdd2440
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cdd2440
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cdd2440

Branch: refs/heads/ignite-104
Commit: 5cdd2440a6b9eb3c5fe0a7620202caf5cb2db441
Parents: 6c1655f 1c10ade
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Fri Jul 31 13:38:39 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Fri Jul 31 13:38:39 2015 -0700

----------------------------------------------------------------------
 .../JettyRestProcessorAbstractSelfTest.java     |  14 +-
 .../managers/communication/GridIoManager.java   | 110 ++++-
 .../GridDhtPartitionsExchangeFuture.java        |  20 +-
 .../handlers/query/QueryCommandHandler.java     |   6 +-
 .../util/nio/GridCommunicationClient.java       |   5 +-
 .../util/nio/GridNioFinishedFuture.java         |  12 +
 .../ignite/internal/util/nio/GridNioFuture.java |  14 +
 .../internal/util/nio/GridNioFutureImpl.java    |  15 +
 .../util/nio/GridNioRecoveryDescriptor.java     |  13 +-
 .../ignite/internal/util/nio/GridNioServer.java |   5 +
 .../util/nio/GridNioSessionMetaKey.java         |   5 +-
 .../util/nio/GridShmemCommunicationClient.java  |   7 +-
 .../util/nio/GridTcpNioCommunicationClient.java |  14 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  84 +++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  45 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   2 +-
 ...CommunicationRecoveryAckClosureSelfTest.java | 464 +++++++++++++++++++
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |   8 +-
 .../IgniteSpiCommunicationSelfTestSuite.java    |   1 +
 .../http/jetty/GridJettyRestHandler.java        |  12 +-
 20 files changed, 779 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cdd2440/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 765ba65,7e17efc..479d116
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@@ -1059,9 -982,9 +1061,10 @@@ public class GridIoManager extends Grid
          Message msg,
          byte plc,
          boolean ordered,
 +        boolean seq,
          long timeout,
-         boolean skipOnTimeout
+         boolean skipOnTimeout,
+         IgniteInClosure<IgniteException> ackClosure
      ) throws IgniteCheckedException {
          assert node != null;
          assert topic != null;
@@@ -1079,10 -1002,11 +1082,13 @@@
  
              if (ordered)
                  processOrderedMessage(locNodeId, ioMsg, plc, null);
 +            else if (seq)
 +                processSequentialMessage(locNodeId, ioMsg, plc, null);
              else
                  processRegularMessage0(ioMsg, locNodeId);
+ 
+             if (ackClosure != null)
+                 ackClosure.apply(null);
          }
          else {
              if (topicOrd < 0)
@@@ -1132,7 -1059,7 +1141,7 @@@
          if (node == null)
              throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
  
-         send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
 -        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
++        send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false, null);
      }
  
      /**
@@@ -1144,7 -1071,7 +1153,7 @@@
       */
      public void send(ClusterNode node, Object topic, Message msg, byte plc)
          throws IgniteCheckedException {
-         send(node, topic, -1, msg, plc, false, false, 0, false);
 -        send(node, topic, -1, msg, plc, false, 0, false, null);
++        send(node, topic, -1, msg, plc, false, false, 0, false, null);
      }
  
      /**
@@@ -1156,7 -1083,7 +1165,7 @@@
       */
      public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
          throws IgniteCheckedException {
-         send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
 -        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
++        send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false, null);
      }
  
      /**
@@@ -1178,7 -1105,7 +1187,7 @@@
      ) throws IgniteCheckedException {
          assert timeout > 0 || skipOnTimeout;
  
-         send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout);
 -        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
++        send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout, null);
      }
  
      /**
@@@ -1205,7 -1132,7 +1214,7 @@@
          if (node == null)
              throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
  
-         send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout);
 -        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
++        send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout, null);
      }
  
      /**
@@@ -1264,47 -1217,30 +1299,71 @@@
      }
  
      /**
+      * @param node Destination node.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @param timeout Timeout to keep a message on receiving queue.
+      * @param skipOnTimeout Whether message can be skipped on timeout.
+      * @param ackClosure Ack closure.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void sendOrderedMessage(
+         ClusterNode node,
+         Object topic,
+         Message msg,
+         byte plc,
+         long timeout,
+         boolean skipOnTimeout,
+         IgniteInClosure<IgniteException> ackClosure
+     ) throws IgniteCheckedException {
+         assert timeout > 0 || skipOnTimeout;
+ 
+         send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+     }
+ 
+      /**
 +     * Sends sequential message.
 +     *
 +     * @param nodeId Destination node ID.
 +     * @param topic Topic.
 +     * @param msg Message.
 +     * @param plc Policy.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    public void sendSequentialMessage(
 +        UUID nodeId,
 +        Object topic,
 +        Message msg,
 +        byte plc
 +    ) throws IgniteCheckedException {
 +        ClusterNode node = ctx.discovery().node(nodeId);
 +
 +        if (node == null)
 +            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 +
 +        sendSequentialMessage(node, topic, msg, plc);
 +    }
 +
 +    /**
 +     * Sends sequential message.
 +     *
 +     * @param node Destination node.
 +     * @param topic Topic.
 +     * @param msg Message.
 +     * @param plc Policy.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    public void sendSequentialMessage(
 +        ClusterNode node,
 +        Object topic,
 +        Message msg,
 +        byte plc
 +    ) throws IgniteCheckedException {
-         send(node, topic, -1, msg, plc, false, true, 0, false);
++        send(node, topic, -1, msg, plc, false, true, 0, false, null);
 +    }
 +
 +    /**
       * Sends a peer deployable user message.
       *
       * @param nodes Destination nodes.
@@@ -1459,7 -1422,7 +1547,7 @@@
              // messages to one node vs. many.
              if (!nodes.isEmpty()) {
                  for (ClusterNode node : nodes)
-                     send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout);
 -                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
++                    send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout, null);
              }
              else if (log.isDebugEnabled())
                  log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +


[5/8] incubator-ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite

Posted by vk...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/aec97640
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/aec97640
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/aec97640

Branch: refs/heads/ignite-104
Commit: aec97640713ecd808440cc48825910d574815cb7
Parents: 6b0552c 271550f
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jul 31 14:36:01 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jul 31 14:36:01 2015 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        | 20 +++++----
 .../communication/tcp/TcpCommunicationSpi.java  | 41 +++++++++++++++---
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 45 +++++++++++---------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  2 +-
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |  8 ++--
 5 files changed, 77 insertions(+), 39 deletions(-)
----------------------------------------------------------------------



[2/8] incubator-ignite git commit: Merging IGNITE-1164

Posted by vk...@apache.org.
Merging IGNITE-1164


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/abb2cef1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/abb2cef1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/abb2cef1

Branch: refs/heads/ignite-104
Commit: abb2cef136da824c55964bb4032c47dd150242c1
Parents: 44072f8
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Jul 31 13:41:41 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Jul 31 13:41:41 2015 +0300

----------------------------------------------------------------------
 .../main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/abb2cef1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 47ba8e6..90133d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2759,6 +2759,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (routerNode.id().equals(getLocalNodeId())) {
                     ClientMessageWorker worker = clientMsgWorkers.get(node.id());
 
+                    if (worker == null)
+                        throw new IgniteSpiException("Client node already disconnected: " + node);
+
                     msg.verify(getLocalNodeId()); // Client worker require verified messages.
 
                     worker.addMessage(msg);


[4/8] incubator-ignite git commit: #ignite-1170: rename psz rest query parameter to pageSize

Posted by vk...@apache.org.
#ignite-1170: rename psz rest query parameter to pageSize


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6b0552cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6b0552cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6b0552cd

Branch: refs/heads/ignite-104
Commit: 6b0552cdedffbbd1855461fbcc988fb36f354ac4
Parents: 7ed4d15
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jul 31 14:35:37 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jul 31 14:35:37 2015 +0300

----------------------------------------------------------------------
 .../rest/JettyRestProcessorAbstractSelfTest.java      | 14 +++++++-------
 .../rest/handlers/query/QueryCommandHandler.java      |  6 +++---
 .../protocols/http/jetty/GridJettyRestHandler.java    | 12 ++++++------
 3 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b0552cd/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 8ce070f..090e030 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -1018,7 +1018,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         Map<String, String> params = new HashMap<>();
         params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key());
         params.put("type", "Person");
-        params.put("psz", "10");
+        params.put("pageSize", "10");
         params.put("cacheName", "person");
         params.put("qry", URLEncoder.encode(qry));
         params.put("arg1", "1000");
@@ -1049,7 +1049,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         Map<String, String> params = new HashMap<>();
         params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key());
         params.put("type", "String");
-        params.put("psz", "1");
+        params.put("pageSize", "1");
         params.put("qry", URLEncoder.encode("select * from String"));
 
         String ret = content(params);
@@ -1064,7 +1064,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         assertNotNull(qryId);
 
         ret = content(F.asMap("cmd", GridRestCommand.FETCH_SQL_QUERY.key(),
-            "psz", "1", "qryId", String.valueOf(qryId)));
+            "pageSize", "1", "qryId", String.valueOf(qryId)));
 
         json = JSONObject.fromObject(ret);
 
@@ -1076,7 +1076,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         assertFalse(last);
 
         ret = content(F.asMap("cmd", GridRestCommand.FETCH_SQL_QUERY.key(),
-            "psz", "1", "qryId", String.valueOf(qryId)));
+            "pageSize", "1", "qryId", String.valueOf(qryId)));
 
         json = JSONObject.fromObject(ret);
 
@@ -1098,7 +1098,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
 
         Map<String, String> params = new HashMap<>();
         params.put("cmd", GridRestCommand.EXECUTE_SQL_FIELDS_QUERY.key());
-        params.put("psz", "10");
+        params.put("pageSize", "10");
         params.put("cacheName", "person");
         params.put("qry", URLEncoder.encode(qry));
 
@@ -1124,7 +1124,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
 
         Map<String, String> params = new HashMap<>();
         params.put("cmd", GridRestCommand.EXECUTE_SQL_FIELDS_QUERY.key());
-        params.put("psz", "10");
+        params.put("pageSize", "10");
         params.put("cacheName", "person");
         params.put("qry", URLEncoder.encode(qry));
 
@@ -1162,7 +1162,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         Map<String, String> params = new HashMap<>();
         params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key());
         params.put("type", "Person");
-        params.put("psz", "1");
+        params.put("pageSize", "1");
         params.put("cacheName", "person");
         params.put("qry", URLEncoder.encode(qry));
         params.put("arg1", "1000");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b0552cd/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index 59f95c9..1712dd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -138,7 +138,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
 
                 if (cache == null)
                     return new GridRestResponse(GridRestResponse.STATUS_FAILED,
-                        "No cache with name [cacheName=" + req.cacheName() + "]");
+                        "Failed to find cache with name: " + req.cacheName());
 
                 QueryCursor qryCur = cache.query(qry);
 
@@ -204,7 +204,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
 
                 if (cur == null)
                     return new GridRestResponse(GridRestResponse.STATUS_FAILED,
-                        "Cannot find query [qryId=" + req.queryId() + "]");
+                        "Failed to find query with ID: " + req.queryId());
 
                 cur.close();
 
@@ -247,7 +247,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
 
                 if (cur == null)
                     return new GridRestResponse(GridRestResponse.STATUS_FAILED,
-                        "Cannot find query [qryId=" + req.queryId() + "]");
+                        "Failed to find query with ID: " + req.queryId());
 
                 CacheQueryResult res = createQueryResult(qryCurs, cur, req, req.queryId());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b0552cd/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
----------------------------------------------------------------------
diff --git a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
index bf0f2c8..75e80ec 100644
--- a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
+++ b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
@@ -479,10 +479,10 @@ public class GridJettyRestHandler extends AbstractHandler {
 
                 restReq0.typeName((String) params.get("type"));
 
-                String psz = (String) params.get("psz");
+                String pageSize = (String) params.get("pageSize");
 
-                if (psz != null)
-                    restReq0.pageSize(Integer.parseInt(psz));
+                if (pageSize != null)
+                    restReq0.pageSize(Integer.parseInt(pageSize));
 
                 restReq0.cacheName((String)params.get("cacheName"));
 
@@ -499,10 +499,10 @@ public class GridJettyRestHandler extends AbstractHandler {
                 if (qryId != null)
                     restReq0.queryId(Long.parseLong(qryId));
 
-                String psz = (String) params.get("psz");
+                String pageSize = (String) params.get("pageSize");
 
-                if (psz != null)
-                    restReq0.pageSize(Integer.parseInt(psz));
+                if (pageSize != null)
+                    restReq0.pageSize(Integer.parseInt(pageSize));
 
                 restReq0.cacheName((String)params.get("cacheName"));