You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/06 14:16:09 UTC
[1/2] ignite git commit: Fixed wrong 'send' method usage in
GridIoManager.
Repository: ignite
Updated Branches:
refs/heads/ignite-4768 d39050c74 -> 9edb0e79b
Fixed wrong 'send' method usage in GridIoManager.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93e19962
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93e19962
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93e19962
Branch: refs/heads/ignite-4768
Commit: 93e19962114194072151840198f04f3406be068a
Parents: 50f8741
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 3 16:39:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 3 16:39:22 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/GridJobSiblingImpl.java | 4 +-
.../internal/managers/GridManagerAdapter.java | 2 +-
.../checkpoint/GridCheckpointManager.java | 2 +-
.../managers/communication/GridIoManager.java | 206 +++++++------------
.../deployment/GridDeploymentCommunication.java | 6 +-
.../eventstorage/GridEventStorageManager.java | 6 +-
.../processors/cache/GridCacheIoManager.java | 8 +-
.../cache/transactions/IgniteTxManager.java | 6 +-
.../clock/GridClockSyncProcessor.java | 2 +-
.../continuous/GridContinuousProcessor.java | 2 +-
.../datastreamer/DataStreamProcessor.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../internal/processors/igfs/IgfsContext.java | 8 +-
.../processors/job/GridJobProcessor.java | 4 +-
.../internal/processors/job/GridJobWorker.java | 2 +-
.../marshaller/ClientRequestFuture.java | 2 +-
.../GridMarshallerMappingProcessor.java | 2 +-
.../handlers/task/GridTaskCommandHandler.java | 4 +-
.../processors/task/GridTaskProcessor.java | 2 +-
.../processors/task/GridTaskWorker.java | 4 +-
.../GridCommunicationSendMessageSelfTest.java | 2 +-
.../communication/GridIoManagerSelfTest.java | 28 +--
.../nio/IgniteExceptionInNioWorkerSelfTest.java | 2 +-
.../communication/GridIoManagerBenchmark.java | 4 +-
.../communication/GridIoManagerBenchmark0.java | 14 +-
.../communication/GridCacheMessageSelfTest.java | 2 +-
.../hadoop/shuffle/HadoopShuffle.java | 3 +-
.../processors/query/h2/IgniteH2Indexing.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 6 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
30 files changed, 140 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
index 2d95f85..79ac416 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
@@ -150,7 +150,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
if (!nodes.isEmpty()) {
try {
- ctx.io().send(nodes, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
+ ctx.io().sendToGridTopic(nodes, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -169,7 +169,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
ctx.job().cancelJob(ses.getId(), jobId, false);
else {
try {
- ctx.io().send(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(ses.getId(), jobId), SYSTEM_POOL);
+ ctx.io().sendToGridTopic(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(ses.getId(), jobId), SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// Avoid stack trace for left nodes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index e864916..d993376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -391,7 +391,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
try {
if (msg instanceof Message)
- ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL);
+ ctx.io().sendToCustomTopic(node, topic, (Message)msg, SYSTEM_POOL);
else
ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 9124caf..8ce8b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -239,7 +239,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
ClusterNode node = ctx.discovery().node(ses.getTaskNodeId());
if (node != null)
- ctx.io().send(
+ ctx.io().sendToGridTopic(
node,
TOPIC_CHECKPOINT,
new GridCheckpointRequest(ses.getId(), key, ses.getCheckpointSpi()),
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 38b5441..5e91ea9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -191,6 +191,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** */
private final AtomicLong ioTestId = new AtomicLong();
+ /** No-op runnable. */
+ private static final IgniteRunnable NOOP = new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ };
+
/**
* @param ctx Grid kernal context.
*/
@@ -328,7 +335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
res.flags(msg0.flags());
try {
- send(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
+ sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send IO test response [msg=" + msg0 + "]", e);
@@ -367,7 +374,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
ClusterNode node = nodes.get(i);
try {
- send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+ sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
ioTestMap().remove(msg.id());
@@ -397,7 +404,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
ioTestMap().put(id, fut);
try {
- send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+ sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
ioTestMap().remove(msg.id());
@@ -791,8 +798,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
finally {
threadProcessingMessage(false);
- if (msgC != null)
- msgC.run();
+ msgC.run();
}
}
@@ -915,46 +921,46 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* Remove listener if it matches expected value.
*
* @param topic Topic.
- * @param expected Listener.
+ * @param exp Listener.
* @return Result.
*/
- private boolean listenerRemove0(Object topic, GridMessageListener expected) {
+ private boolean listenerRemove0(Object topic, GridMessageListener exp) {
if (topic instanceof GridTopic) {
synchronized (sysLsnrsMux) {
- return systemListenerChange(topic, expected, null);
+ return systemListenerChange(topic, exp, null);
}
}
else
- return lsnrMap.remove(topic, expected);
+ return lsnrMap.remove(topic, exp);
}
/**
* Replace listener.
*
* @param topic Topic.
- * @param expected Old value.
+ * @param exp Old value.
* @param newVal New value.
* @return Result.
*/
- private boolean listenerReplace0(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+ private boolean listenerReplace0(Object topic, GridMessageListener exp, GridMessageListener newVal) {
if (topic instanceof GridTopic) {
synchronized (sysLsnrsMux) {
- return systemListenerChange(topic, expected, newVal);
+ return systemListenerChange(topic, exp, newVal);
}
}
else
- return lsnrMap.replace(topic, expected, newVal);
+ return lsnrMap.replace(topic, exp, newVal);
}
/**
* Change system listener.
*
* @param topic Topic.
- * @param expected Expected value.
+ * @param exp Expected value.
* @param newVal New value.
* @return Result.
*/
- private boolean systemListenerChange(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+ private boolean systemListenerChange(Object topic, GridMessageListener exp, GridMessageListener newVal) {
assert Thread.holdsLock(sysLsnrsMux);
assert topic instanceof GridTopic;
@@ -962,7 +968,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
GridMessageListener old = sysLsnrs[idx];
- if (old != null && old.equals(expected)) {
+ if (old != null && old.equals(exp)) {
changeSystemListener(idx, newVal);
return true;
@@ -1263,6 +1269,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert node != null;
assert topic != null;
assert msg != null;
+ assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
+ assert topicOrd >= 0 || !(topic instanceof GridTopic) : msg;
GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
@@ -1276,11 +1284,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (ordered)
processOrderedMessage(locNodeId, ioMsg, plc, null);
- else if (async) {
- assert msg instanceof GridIoUserMessage : ioMsg; // Async execution was added only for IgniteMessaging.
-
- processRegularMessage(locNodeId, ioMsg, plc, null);
- }
+ else if (async)
+ processRegularMessage(locNodeId, ioMsg, plc, NOOP);
else
processRegularMessage0(ioMsg, locNodeId);
@@ -1313,14 +1318,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(UUID nodeId, Object topic, Message msg, byte plc)
+ public void sendToCustomTopic(UUID nodeId, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, msg, plc);
+ sendToCustomTopic(node, topic, msg, plc);
}
/**
@@ -1331,7 +1336,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@SuppressWarnings("TypeMayBeWeakened")
- public void send(UUID nodeId, GridTopic topic, Message msg, byte plc)
+ public void sendToGridTopic(UUID nodeId, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
ClusterNode node = ctx.discovery().node(nodeId);
@@ -1348,7 +1353,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(ClusterNode node, Object topic, Message msg, byte plc)
+ public void sendToCustomTopic(ClusterNode node, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
send(node, topic, -1, msg, plc, false, 0, false, null, false);
}
@@ -1358,12 +1363,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
- * @param async Async flag.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+ public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, async);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
}
/**
@@ -1374,7 +1378,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
+ public void sendGeneric(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
throws IgniteCheckedException {
send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
}
@@ -1402,33 +1406,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
- * @param nodeId Destination node.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param timeout Timeout to keep a message on receiving queue.
- * @param skipOnTimeout Whether message can be skipped on timeout.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void sendOrderedMessage(
- UUID nodeId,
- Object topic,
- Message msg,
- byte plc,
- long timeout,
- boolean skipOnTimeout
- ) throws IgniteCheckedException {
- assert timeout > 0 || skipOnTimeout;
-
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null)
- throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
- }
-
- /**
* @param node Destination nodes.
* @param topic Topic to send the message to.
* @param msg Message to send.
@@ -1436,8 +1413,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param ackC Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
- IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+ public void sendToGridTopic(ClusterNode node,
+ GridTopic topic,
+ Message msg,
+ byte plc,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException
+ {
send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
}
@@ -1450,9 +1431,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param skipOnTimeout Whether message can be skipped on timeout.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void sendOrderedMessage(
+ void sendOrderedMessageToGridTopic(
Collection<? extends ClusterNode> nodes,
- Object topic,
+ GridTopic topic,
Message msg,
byte plc,
long timeout,
@@ -1461,36 +1442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
- }
-
- /**
- * @param node Destination nodes.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param ackC Ack closure.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
- throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, 0, false, ackC, false);
- }
-
- /**
- * @param nodes Destination node.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void send(
- Collection<? extends ClusterNode> nodes,
- Object topic,
- Message msg,
- byte plc
- ) throws IgniteCheckedException {
- send(nodes, topic, -1, msg, plc, false, 0, false);
+ send(nodes, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout);
}
/**
@@ -1500,7 +1452,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(
+ public void sendToGridTopic(
Collection<? extends ClusterNode> nodes,
GridTopic topic,
Message msg,
@@ -1540,7 +1492,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param msg Message to send.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
+ void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
sendUserMessage(nodes, msg, null, false, 0, false);
}
@@ -1556,8 +1508,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@SuppressWarnings("ConstantConditions")
- public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg,
- @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException {
+ public void sendUserMessage(Collection<? extends ClusterNode> nodes,
+ Object msg,
+ @Nullable Object topic,
+ boolean ordered,
+ long timeout,
+ boolean async) throws IgniteCheckedException
+ {
boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId);
byte[] serMsg = null;
@@ -1600,22 +1557,42 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
dep != null ? dep.participants() : null);
if (ordered)
- sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
- else if (loc)
- send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+ sendOrderedMessageToGridTopic(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
+ else if (loc) {
+ send(F.first(nodes),
+ TOPIC_COMM_USER,
+ TOPIC_COMM_USER.ordinal(),
+ ioMsg,
+ PUBLIC_POOL,
+ false,
+ 0,
+ false,
+ null,
+ async);
+ }
else {
ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(locNodeId));
if (!rmtNodes.isEmpty())
- send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ sendToGridTopic(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
// Will call local listeners in current thread synchronously or through pool,
// depending async flag, so must go the last
// to allow remote nodes execute the requested operation in parallel.
- if (locNode != null)
- send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+ if (locNode != null) {
+ send(locNode,
+ TOPIC_COMM_USER,
+ TOPIC_COMM_USER.ordinal(),
+ ioMsg,
+ PUBLIC_POOL,
+ false,
+ 0,
+ false,
+ null,
+ async);
+ }
}
}
@@ -1657,35 +1634,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
- * @param nodeId Destination node.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param timeout Timeout to keep a message on receiving queue.
- * @param skipOnTimeout Whether message can be skipped on timeout.
- * @param ackC Ack closure.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void sendOrderedMessage(
- UUID nodeId,
- Object topic,
- Message msg,
- byte plc,
- long timeout,
- boolean skipOnTimeout,
- IgniteInClosure<IgniteException> ackC
- ) throws IgniteCheckedException {
- assert timeout > 0 || skipOnTimeout;
-
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null)
- throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
- }
-
- /**
* @param nodes Destination nodes.
* @param topic Topic to send the message to.
* @param topicOrd Topic ordinal value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index a571ae4..ffbde37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -294,7 +294,7 @@ class GridDeploymentCommunication {
if (node != null) {
try {
- ctx.io().send(node, topic, res, GridIoPolicy.P2P_POOL);
+ ctx.io().sendToCustomTopic(node, topic, res, GridIoPolicy.P2P_POOL);
if (log.isDebugEnabled())
log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']');
@@ -324,7 +324,7 @@ class GridDeploymentCommunication {
Message req = new GridDeploymentRequest(null, null, rsrcName, true);
if (!rmtNodes.isEmpty()) {
- ctx.io().send(
+ ctx.io().sendToGridTopic(
rmtNodes,
TOPIC_CLASSLOAD,
req,
@@ -445,7 +445,7 @@ class GridDeploymentCommunication {
if (req.responseTopic() != null && !ctx.localNodeId().equals(dstNode.id()))
req.responseTopicBytes(U.marshal(marsh, req.responseTopic()));
- ctx.io().send(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);
+ ctx.io().sendToGridTopic(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);
if (log.isDebugEnabled())
log.debug("Sent peer class loading request [node=" + dstNode.id() + ", req=" + req + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index b5d5ee2..656c739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1040,12 +1040,12 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(ctx.localNodeId()));
if (locNode != null)
- ctx.io().send(locNode, topic, msg, plc);
+ ctx.io().sendToGridTopic(locNode, topic, msg, plc);
if (!rmtNodes.isEmpty()) {
msg.responseTopicBytes(U.marshal(marsh, msg.responseTopic()));
- ctx.io().send(rmtNodes, topic, msg, plc);
+ ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc);
}
}
@@ -1164,7 +1164,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
res.exceptionBytes(U.marshal(marsh, res.exception()));
}
- ctx.io().send(node, req.responseTopic(), res, PUBLIC_POOL);
+ ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send event query response to node [node=" + nodeId + ", res=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index d20310b..50f58cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -908,7 +908,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
try {
cnt++;
- cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
+ cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
return;
}
@@ -969,7 +969,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
});
- cctx.gridIO().send(nodesView, TOPIC_CACHE, msg, plc);
+ cctx.gridIO().sendToGridTopic(nodesView, TOPIC_CACHE, msg, plc);
boolean added = false;
@@ -1116,7 +1116,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param plc IO policy.
* @throws IgniteCheckedException If send failed.
*/
- public void sendNoRetry(ClusterNode node,
+ void sendNoRetry(ClusterNode node,
GridCacheMessage msg,
byte plc)
throws IgniteCheckedException {
@@ -1127,7 +1127,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
return;
try {
- cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
+ cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
if (log.isDebugEnabled())
log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a1a18fe..f4a5629 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2065,7 +2065,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!cctx.localNodeId().equals(nodeId))
req.prepareMarshal(cctx);
- cctx.gridIO().send(node, TOPIC_TX, req, SYSTEM_POOL);
+ cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyCheckedException) {
@@ -2508,7 +2508,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!cctx.localNodeId().equals(nodeId))
res.prepareMarshal(cctx);
- cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL);
+ cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send response to node [node=" + nodeId + ", res=" + res + ']', e);
@@ -2545,7 +2545,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
res.futureId(req.futureId());
try {
- cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL);
+ cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId +
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 257d0d9..d644261 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -320,7 +320,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
snapshot.version(), snapshot.deltas());
try {
- ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
+ ctx.io().sendToGridTopic(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
if (ctx.discovery().pingNodeNoError(n.id()))
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 575bc69..55f65c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1397,7 +1397,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ackC);
}
else
- ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
+ ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
break;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index b6400e8..74d5f4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -420,7 +420,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
try {
- ctx.io().send(nodeId, resTopic, res, threadIoPolicy());
+ ctx.io().sendToCustomTopic(nodeId, resTopic, res, threadIoPolicy());
}
catch (IgniteCheckedException e) {
if (ctx.discovery().alive(nodeId))
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index f97fc14..4c1de2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1693,7 +1693,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
topVer);
try {
- ctx.io().send(node, TOPIC_DATASTREAM, req, plc);
+ ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, plc);
if (log.isDebugEnabled())
log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 4c037b7..0b2558a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
import org.jetbrains.annotations.Nullable;
/**
@@ -169,7 +170,10 @@ public class IgfsContext {
if (!kernalContext().localNodeId().equals(nodeId))
msg.prepareMarshal(kernalContext().config().getMarshaller());
- kernalContext().io().send(nodeId, topic, msg, plc);
+ if (topic instanceof GridTopic)
+ kernalContext().io().sendToGridTopic(nodeId, (GridTopic)topic, msg, plc);
+ else
+ kernalContext().io().sendToCustomTopic(nodeId, topic, msg, plc);
}
/**
@@ -184,7 +188,7 @@ public class IgfsContext {
if (!kernalContext().localNodeId().equals(node.id()))
msg.prepareMarshal(kernalContext().config().getMarshaller());
- kernalContext().io().send(node, topic, msg, plc);
+ kernalContext().io().sendToCustomTopic(node, topic, msg, plc);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 2b6699d..9ed6ff3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -518,7 +518,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
ctx.io().addMessageListener(topic, msgLsnr);
// 3. Send message.
- ctx.io().send(taskNode, TOPIC_JOB_SIBLINGS,
+ ctx.io().sendToGridTopic(taskNode, TOPIC_JOB_SIBLINGS,
new GridJobSiblingsRequest(ses.getId(),
loc ? topic : null,
loc ? null : U.marshal(marsh, topic)),
@@ -1379,7 +1379,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
else
// Send response to common topic as unordered message.
- ctx.io().send(sndNode, TOPIC_TASK, jobRes, req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL);
+ ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// The only option here is to log, as we must assume that resending will fail too.
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index acefde7..9b7615f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -912,7 +912,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
else
// Send response to common topic as unordered message.
- ctx.io().send(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
+ ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// Log and invoke the master-leave callback.
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
index 773dabe..0be4e09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
@@ -96,7 +96,7 @@ final class ClientRequestFuture extends GridFutureAdapter<MappingExchangeResult>
ClusterNode srvNode = aliveSrvNodes.poll();
try {
- ioMgr.send(
+ ioMgr.sendToGridTopic(
srvNode,
GridTopic.TOPIC_MAPPING_MARSH,
new MissingMappingRequestMessage(
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index fdea869..66c19a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -178,7 +178,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
try {
- ioMgr.send(
+ ioMgr.sendToGridTopic(
nodeId,
TOPIC_MAPPING_MARSH,
new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 947435c..99ba335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -143,7 +143,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
Object topic = U.unmarshal(ctx, req.topicBytes(), U.resolveClassLoader(ctx.config()));
- ctx.io().send(nodeId, topic, res, SYSTEM_POOL);
+ ctx.io().sendToCustomTopic(nodeId, topic, res, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send job task result response.", e);
@@ -494,7 +494,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
try {
byte[] topicBytes = U.marshal(ctx, topic);
- ctx.io().send(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
+ ctx.io().sendToGridTopic(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
String errMsg = "Failed to send task result request [resHolderId=" + resHolderId +
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d32b51c..ec5d4c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1311,7 +1311,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
boolean loc = ctx.localNodeId().equals(nodeId);
- ctx.io().send(nodeId, topic,
+ ctx.io().sendToCustomTopic(nodeId, topic,
new GridJobSiblingsResponse(
loc ? siblings : null,
loc ? null : U.marshal(marsh, siblings)),
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index d18ea5f..02ef0fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1281,7 +1281,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
ClusterNode node = ctx.discovery().node(nodeId);
if (node != null)
- ctx.io().send(node,
+ ctx.io().sendToGridTopic(node,
TOPIC_JOB_CANCEL,
new GridJobCancelRequest(ses.getId(), res.getJobContext().getJobId(), /*courtesy*/true),
PUBLIC_POOL);
@@ -1382,7 +1382,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
else {
// Send job execution request.
- ctx.io().send(node, TOPIC_JOB, req, internal ? MANAGEMENT_POOL : PUBLIC_POOL);
+ ctx.io().sendToGridTopic(node, TOPIC_JOB, req, internal ? MANAGEMENT_POOL : PUBLIC_POOL);
if (log.isDebugEnabled())
log.debug("Sent job request [req=" + req + ", node=" + node + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 8503b48..f58be87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -129,7 +129,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
long time = System.nanoTime();
for (int i = 1; i <= SAMPLE_CNT; i++) {
- mgr0.send(grid(1).localNode(), topic, new TestMessage(), GridIoPolicy.PUBLIC_POOL);
+ mgr0.sendToCustomTopic(grid(1).localNode(), topic, new TestMessage(), GridIoPolicy.PUBLIC_POOL);
if (i % 500 == 0)
info("Sent messages count: " + i);
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index f5499d3..f4257a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -90,21 +90,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
public void testSendIfOneOfNodesIsLocalAndTopicIsEnum() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- new GridIoManager(ctx).send(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, new TestMessage(),
- GridIoPolicy.P2P_POOL);
-
- return null;
- }
- }, AssertionError.class, "Internal Ignite code should never call the method with local node in a node list.");
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSendIfOneOfNodesIsLocalAndTopicIsObject() throws Exception {
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- new GridIoManager(ctx).send(F.asList(locNode, rmtNode), new Object(), new TestMessage(),
+ new GridIoManager(ctx).sendToGridTopic(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, new TestMessage(),
GridIoPolicy.P2P_POOL);
return null;
@@ -127,12 +113,12 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
// No-op. We are using mocks so real sending is impossible.
}
- verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
+ verify(ioMgr).sendToGridTopic(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
eq(GridIoPolicy.PUBLIC_POOL));
Collection<? extends ClusterNode> rmtNodes = F.view(F.asList(rmtNode), F.remoteNodes(locNode.id()));
- verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
+ verify(ioMgr).sendToGridTopic(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL));
}
@@ -151,12 +137,12 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
// No-op. We are using mocks so real sending is impossible.
}
- verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
+ verify(ioMgr).sendToGridTopic(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
eq(GridIoPolicy.PUBLIC_POOL));
Collection<? extends ClusterNode> rmtNodes = F.view(F.asList(rmtNode), F.remoteNodes(locNode.id()));
- verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
+ verify(ioMgr).sendToGridTopic(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL));
}
@@ -175,7 +161,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
// No-op. We are using mocks so real sending is impossible.
}
- verify(ioMgr).sendOrderedMessage(
+ verify(ioMgr).sendOrderedMessageToGridTopic(
argThat(new IsEqualCollection(F.asList(locNode, rmtNode))),
eq(GridTopic.TOPIC_COMM_USER),
any(GridIoUserMessage.class),
@@ -196,7 +182,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+ @Override public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
index 9961833..8ac6e6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
@@ -74,7 +74,7 @@ public class IgniteExceptionInNioWorkerSelfTest extends GridCommonAbstractTest {
UUID nodeId = ignite(1).cluster().localNode().id();
// This should trigger a failure in a NIO thread.
- kernal.context().io().send(nodeId, GridTopic.TOPIC_CACHE.topic("cache"), new BrokenMessage(), (byte)0);
+ kernal.context().io().sendToCustomTopic(nodeId, GridTopic.TOPIC_CACHE.topic("cache"), new BrokenMessage(), (byte)0);
for (int i = 0; i < 100; i++)
ignite(0).cache("cache").put(i, i);
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
index 723495c..03bbb00 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
@@ -249,7 +249,7 @@ public class GridIoManagerBenchmark {
testMsg.bytes(null);
try {
- io.send(node, TEST_TOPIC, testMsg, PUBLIC_POOL);
+ io.sendToCustomTopic(node, TEST_TOPIC, testMsg, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
e.printStackTrace();
@@ -293,7 +293,7 @@ public class GridIoManagerBenchmark {
else
sem.acquire();
- io.send(
+ io.sendToCustomTopic(
dst,
TEST_TOPIC,
new GridTestMessage(msgId, testHeavyMsgs ? arrs[rnd.nextInt(arrs.length)] : null),
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index f2c6255..92b29e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -132,7 +132,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg) {
try {
- rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+ rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
error("Failed to send message.", e);
@@ -176,7 +176,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
while (!finish.get()) {
sem.acquire();
- snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+ snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
}
}
catch (IgniteCheckedException e) {
@@ -226,7 +226,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg) {
try {
- rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+ rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
error("Failed to send message.", e);
@@ -270,7 +270,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
map.put(msgId, latch);
- snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+ snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
latch.await();
@@ -326,7 +326,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg) {
try {
- rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+ rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
error("Failed to send message.", e);
@@ -362,7 +362,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
sem.acquire();
- snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+ snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
}
return null;
@@ -432,7 +432,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
latches.put(msgId, latch);
- snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+ snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
long start = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 9c97542..c0ea662 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -194,7 +194,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
msg.add(mes2);
}
- mgr0.send(grid(1).localNode(), topic, msg, GridIoPolicy.PUBLIC_POOL);
+ mgr0.sendToCustomTopic(grid(1).localNode(), topic, msg, GridIoPolicy.PUBLIC_POOL);
assert latch.await(3, SECONDS);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index e67a26a..7575ff4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -145,7 +145,7 @@ public class HadoopShuffle extends HadoopComponent {
ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
if (msg instanceof Message)
- ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
+ ctx.kernalContext().io().sendToGridTopic(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
else
ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false);
}
@@ -153,6 +153,7 @@ public class HadoopShuffle extends HadoopComponent {
/**
* @param jobId Task info.
* @return Shuffle job.
+ * @throws IgniteCheckedException If failed.
*/
private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException {
HadoopShuffleJob<UUID> res = jobs.get(jobId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e4b0c1f..88cd89b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1890,7 +1890,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
((GridCacheQueryMarshallable)msg).marshall(marshaller);
}
- ctx.io().send(node, topic, topicOrd, msg, plc);
+ ctx.io().sendGeneric(node, topic, topicOrd, msg, plc);
}
catch (IgniteCheckedException e) {
ok = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 2802da5..33a6778 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -655,7 +655,7 @@ public class GridMapQueryExecutor {
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
}
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
}
catch (Exception e) {
e.addSuppressed(err);
@@ -729,7 +729,7 @@ public class GridMapQueryExecutor {
if (loc)
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
}
catch (IgniteCheckedException e) {
log.error("Failed to send message.", e);
@@ -756,7 +756,7 @@ public class GridMapQueryExecutor {
if (loc)
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
}
catch (Exception e) {
U.warn(log, "Failed to send retry message: " + e.getMessage());
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 61ca11d..604e522 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -328,7 +328,7 @@ public class GridReduceQueryExecutor {
if (node.isLocal())
h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.QUERY_POOL);
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.QUERY_POOL);
}
catch (IgniteCheckedException e) {
throw new CacheException("Failed to fetch data from node: " + node.id(), e);
[2/2] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-2.0' into ignite-4768
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-2.0' into ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9edb0e79
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9edb0e79
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9edb0e79
Branch: refs/heads/ignite-4768
Commit: 9edb0e79bc8388c378c203f2ad0b62bf41e0e456
Parents: d39050c 93e1996
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 6 17:15:14 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Mar 6 17:15:14 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/GridJobSiblingImpl.java | 4 +-
.../internal/managers/GridManagerAdapter.java | 2 +-
.../checkpoint/GridCheckpointManager.java | 2 +-
.../managers/communication/GridIoManager.java | 206 +++++++------------
.../deployment/GridDeploymentCommunication.java | 6 +-
.../eventstorage/GridEventStorageManager.java | 6 +-
.../processors/cache/GridCacheIoManager.java | 8 +-
.../cache/transactions/IgniteTxManager.java | 6 +-
.../clock/GridClockSyncProcessor.java | 2 +-
.../continuous/GridContinuousProcessor.java | 2 +-
.../datastreamer/DataStreamProcessor.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../internal/processors/igfs/IgfsContext.java | 8 +-
.../processors/job/GridJobProcessor.java | 4 +-
.../internal/processors/job/GridJobWorker.java | 2 +-
.../marshaller/ClientRequestFuture.java | 2 +-
.../GridMarshallerMappingProcessor.java | 2 +-
.../handlers/task/GridTaskCommandHandler.java | 4 +-
.../processors/task/GridTaskProcessor.java | 2 +-
.../processors/task/GridTaskWorker.java | 4 +-
.../GridCommunicationSendMessageSelfTest.java | 2 +-
.../communication/GridIoManagerSelfTest.java | 28 +--
.../nio/IgniteExceptionInNioWorkerSelfTest.java | 2 +-
.../communication/GridIoManagerBenchmark.java | 4 +-
.../communication/GridIoManagerBenchmark0.java | 14 +-
.../communication/GridCacheMessageSelfTest.java | 2 +-
.../hadoop/shuffle/HadoopShuffle.java | 3 +-
.../processors/query/h2/IgniteH2Indexing.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 6 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
30 files changed, 140 insertions(+), 201 deletions(-)
----------------------------------------------------------------------