You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/02 09:51:02 UTC
[3/3] ignite git commit: ignite-1.9-fix
ignite-1.9-fix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/524935f9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/524935f9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/524935f9
Branch: refs/heads/ignite-1.9-fix
Commit: 524935f937e0164ee22067c13ebc82b0cac7f88b
Parents: f3ef67e
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 12:50:51 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 12:50:51 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 112 +++++++------------
.../communication/GridIoManagerSelfTest.java | 2 +-
2 files changed, 40 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/524935f9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 0fbdd5f..0b7c790 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1256,12 +1256,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
boolean ordered,
long timeout,
boolean skipOnTimeout,
- IgniteInClosure<IgniteException> ackC
+ IgniteInClosure<IgniteException> ackC,
+ boolean async
) throws IgniteCheckedException {
assert node != null;
assert topic != null;
assert msg != null;
- //assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
+ assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
@@ -1275,6 +1276,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (ordered)
processOrderedMessage(locNodeId, ioMsg, plc, null);
+ else if (async)
+ processRegularMessage(locNodeId, ioMsg, plc, NOOP);
else
processRegularMessage0(ioMsg, locNodeId);
@@ -1332,7 +1335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
}
/**
@@ -1344,7 +1347,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, 0, false, null);
+ send(node, topic, -1, msg, plc, false, 0, false, null, false);
}
/**
@@ -1352,12 +1355,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
- * @param async Async flag.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
}
/**
@@ -1370,7 +1372,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, topicOrd, msg, plc, false, 0, false, null);
+ send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
}
/**
@@ -1392,34 +1394,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
- }
-
- /**
- * @param nodeId Destination node.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param timeout Timeout to keep a message on receiving queue.
- * @param skipOnTimeout Whether message can be skipped on timeout.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void sendOrderedMessage(
- UUID nodeId,
- Object topic,
- Message msg,
- byte plc,
- long timeout,
- boolean skipOnTimeout
- ) throws IgniteCheckedException {
- assert timeout > 0 || skipOnTimeout;
-
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null)
- throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
}
/**
@@ -1432,7 +1407,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
}
/**
@@ -1468,7 +1443,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, 0, false, ackC);
+ send(node, topic, -1, msg, plc, false, 0, false, ackC, false);
}
/**
@@ -1524,7 +1499,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
}
/**
@@ -1595,8 +1570,18 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (ordered)
sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
- else if (loc)
- send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+ else if (loc) {
+ send(F.first(nodes),
+ TOPIC_COMM_USER,
+ TOPIC_COMM_USER.ordinal(),
+ ioMsg,
+ PUBLIC_POOL,
+ false,
+ 0,
+ false,
+ null,
+ async);
+ }
else {
ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
@@ -1608,8 +1593,18 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// Will call local listeners in current thread synchronously or through pool,
// depending async flag, so must go the last
// to allow remote nodes execute the requested operation in parallel.
- if (locNode != null)
- send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+ if (locNode != null) {
+ send(locNode,
+ TOPIC_COMM_USER,
+ TOPIC_COMM_USER.ordinal(),
+ ioMsg,
+ PUBLIC_POOL,
+ false,
+ 0,
+ false,
+ null,
+ async);
+ }
}
}
@@ -1651,35 +1646,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
- * @param nodeId Destination node.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param timeout Timeout to keep a message on receiving queue.
- * @param skipOnTimeout Whether message can be skipped on timeout.
- * @param ackC Ack closure.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void sendOrderedMessage(
- UUID nodeId,
- Object topic,
- Message msg,
- byte plc,
- long timeout,
- boolean skipOnTimeout,
- IgniteInClosure<IgniteException> ackC
- ) throws IgniteCheckedException {
- assert timeout > 0 || skipOnTimeout;
-
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null)
- throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
- }
-
- /**
* @param nodes Destination nodes.
* @param topic Topic to send the message to.
* @param topicOrd Topic ordinal value.
@@ -1713,7 +1679,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// messages to one node vs. many.
if (!nodes.isEmpty()) {
for (ClusterNode node : nodes)
- send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
+ send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false);
}
else if (log.isDebugEnabled())
log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/524935f9/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index f5499d3..2bc1398 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -196,7 +196,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+ @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
// No-op.
}