You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/02 09:51:02 UTC

[3/3] ignite git commit: ignite-1.9-fix

ignite-1.9-fix


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/524935f9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/524935f9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/524935f9

Branch: refs/heads/ignite-1.9-fix
Commit: 524935f937e0164ee22067c13ebc82b0cac7f88b
Parents: f3ef67e
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 12:50:51 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 12:50:51 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 112 +++++++------------
 .../communication/GridIoManagerSelfTest.java    |   2 +-
 2 files changed, 40 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/524935f9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 0fbdd5f..0b7c790 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1256,12 +1256,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         boolean ordered,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackC
+        IgniteInClosure<IgniteException> ackC,
+        boolean async
     ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
         assert msg != null;
-        //assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
+        assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
 
         GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
 
@@ -1275,6 +1276,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             if (ordered)
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
+            else if (async)
+                processRegularMessage(locNodeId, ioMsg, plc, NOOP);
             else
                 processRegularMessage0(ioMsg, locNodeId);
 
@@ -1332,7 +1335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1344,7 +1347,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, null);
+        send(node, topic, -1, msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1352,12 +1355,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param async Async flag.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1370,7 +1372,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topicOrd, msg, plc, false, 0, false, null);
+        send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1392,34 +1394,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
-    }
-
-    /**
-     * @param nodeId Destination node.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param timeout Timeout to keep a message on receiving queue.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void sendOrderedMessage(
-        UUID nodeId,
-        Object topic,
-        Message msg,
-        byte plc,
-        long timeout,
-        boolean skipOnTimeout
-    ) throws IgniteCheckedException {
-        assert timeout > 0 || skipOnTimeout;
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
     }
 
     /**
@@ -1432,7 +1407,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
         IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
     }
 
     /**
@@ -1468,7 +1443,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, ackC);
+        send(node, topic, -1, msg, plc, false, 0, false, ackC, false);
     }
 
     /**
@@ -1524,7 +1499,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
     }
 
     /**
@@ -1595,8 +1570,18 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         if (ordered)
             sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
-        else if (loc)
-            send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+        else if (loc) {
+            send(F.first(nodes),
+                TOPIC_COMM_USER,
+                TOPIC_COMM_USER.ordinal(),
+                ioMsg,
+                PUBLIC_POOL,
+                false,
+                0,
+                false,
+                null,
+                async);
+        }
         else {
             ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
 
@@ -1608,8 +1593,18 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             // Will call local listeners in current thread synchronously or through pool,
             // depending async flag, so must go the last
             // to allow remote nodes execute the requested operation in parallel.
-            if (locNode != null)
-                send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+            if (locNode != null) {
+                send(locNode,
+                    TOPIC_COMM_USER,
+                    TOPIC_COMM_USER.ordinal(),
+                    ioMsg,
+                    PUBLIC_POOL,
+                    false,
+                    0,
+                    false,
+                    null,
+                    async);
+            }
         }
     }
 
@@ -1651,35 +1646,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param nodeId Destination node.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param timeout Timeout to keep a message on receiving queue.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackC Ack closure.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void sendOrderedMessage(
-        UUID nodeId,
-        Object topic,
-        Message msg,
-        byte plc,
-        long timeout,
-        boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackC
-    ) throws IgniteCheckedException {
-        assert timeout > 0 || skipOnTimeout;
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
-    }
-
-    /**
      * @param nodes Destination nodes.
      * @param topic Topic to send the message to.
      * @param topicOrd Topic ordinal value.
@@ -1713,7 +1679,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             // messages to one node vs. many.
             if (!nodes.isEmpty()) {
                 for (ClusterNode node : nodes)
-                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
+                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false);
             }
             else if (log.isDebugEnabled())
                 log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/524935f9/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index f5499d3..2bc1398 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -196,7 +196,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+        @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
             throws IgniteCheckedException {
             // No-op.
         }