You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/04 16:35:03 UTC
[1/3] ignite git commit: Fixed wrong 'send' method usage in
GridIoManager.
Repository: ignite
Updated Branches:
refs/heads/ignite-4705-2 4ab159def -> 46c71ae51
Fixed wrong 'send' method usage in GridIoManager.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93e19962
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93e19962
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93e19962
Branch: refs/heads/ignite-4705-2
Commit: 93e19962114194072151840198f04f3406be068a
Parents: 50f8741
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 3 16:39:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 3 16:39:22 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/GridJobSiblingImpl.java | 4 +-
.../internal/managers/GridManagerAdapter.java | 2 +-
.../checkpoint/GridCheckpointManager.java | 2 +-
.../managers/communication/GridIoManager.java | 206 +++++++------------
.../deployment/GridDeploymentCommunication.java | 6 +-
.../eventstorage/GridEventStorageManager.java | 6 +-
.../processors/cache/GridCacheIoManager.java | 8 +-
.../cache/transactions/IgniteTxManager.java | 6 +-
.../clock/GridClockSyncProcessor.java | 2 +-
.../continuous/GridContinuousProcessor.java | 2 +-
.../datastreamer/DataStreamProcessor.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../internal/processors/igfs/IgfsContext.java | 8 +-
.../processors/job/GridJobProcessor.java | 4 +-
.../internal/processors/job/GridJobWorker.java | 2 +-
.../marshaller/ClientRequestFuture.java | 2 +-
.../GridMarshallerMappingProcessor.java | 2 +-
.../handlers/task/GridTaskCommandHandler.java | 4 +-
.../processors/task/GridTaskProcessor.java | 2 +-
.../processors/task/GridTaskWorker.java | 4 +-
.../GridCommunicationSendMessageSelfTest.java | 2 +-
.../communication/GridIoManagerSelfTest.java | 28 +--
.../nio/IgniteExceptionInNioWorkerSelfTest.java | 2 +-
.../communication/GridIoManagerBenchmark.java | 4 +-
.../communication/GridIoManagerBenchmark0.java | 14 +-
.../communication/GridCacheMessageSelfTest.java | 2 +-
.../hadoop/shuffle/HadoopShuffle.java | 3 +-
.../processors/query/h2/IgniteH2Indexing.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 6 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
30 files changed, 140 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
index 2d95f85..79ac416 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
@@ -150,7 +150,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
if (!nodes.isEmpty()) {
try {
- ctx.io().send(nodes, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
+ ctx.io().sendToGridTopic(nodes, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -169,7 +169,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
ctx.job().cancelJob(ses.getId(), jobId, false);
else {
try {
- ctx.io().send(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(ses.getId(), jobId), SYSTEM_POOL);
+ ctx.io().sendToGridTopic(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(ses.getId(), jobId), SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// Avoid stack trace for left nodes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index e864916..d993376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -391,7 +391,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
try {
if (msg instanceof Message)
- ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL);
+ ctx.io().sendToCustomTopic(node, topic, (Message)msg, SYSTEM_POOL);
else
ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 9124caf..8ce8b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -239,7 +239,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
ClusterNode node = ctx.discovery().node(ses.getTaskNodeId());
if (node != null)
- ctx.io().send(
+ ctx.io().sendToGridTopic(
node,
TOPIC_CHECKPOINT,
new GridCheckpointRequest(ses.getId(), key, ses.getCheckpointSpi()),
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 38b5441..5e91ea9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -191,6 +191,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** */
private final AtomicLong ioTestId = new AtomicLong();
+ /** No-op runnable. */
+ private static final IgniteRunnable NOOP = new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ };
+
/**
* @param ctx Grid kernal context.
*/
@@ -328,7 +335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
res.flags(msg0.flags());
try {
- send(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
+ sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send IO test response [msg=" + msg0 + "]", e);
@@ -367,7 +374,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
ClusterNode node = nodes.get(i);
try {
- send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+ sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
ioTestMap().remove(msg.id());
@@ -397,7 +404,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
ioTestMap().put(id, fut);
try {
- send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+ sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
ioTestMap().remove(msg.id());
@@ -791,8 +798,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
finally {
threadProcessingMessage(false);
- if (msgC != null)
- msgC.run();
+ msgC.run();
}
}
@@ -915,46 +921,46 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* Remove listener if it matches expected value.
*
* @param topic Topic.
- * @param expected Listener.
+ * @param exp Listener.
* @return Result.
*/
- private boolean listenerRemove0(Object topic, GridMessageListener expected) {
+ private boolean listenerRemove0(Object topic, GridMessageListener exp) {
if (topic instanceof GridTopic) {
synchronized (sysLsnrsMux) {
- return systemListenerChange(topic, expected, null);
+ return systemListenerChange(topic, exp, null);
}
}
else
- return lsnrMap.remove(topic, expected);
+ return lsnrMap.remove(topic, exp);
}
/**
* Replace listener.
*
* @param topic Topic.
- * @param expected Old value.
+ * @param exp Old value.
* @param newVal New value.
* @return Result.
*/
- private boolean listenerReplace0(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+ private boolean listenerReplace0(Object topic, GridMessageListener exp, GridMessageListener newVal) {
if (topic instanceof GridTopic) {
synchronized (sysLsnrsMux) {
- return systemListenerChange(topic, expected, newVal);
+ return systemListenerChange(topic, exp, newVal);
}
}
else
- return lsnrMap.replace(topic, expected, newVal);
+ return lsnrMap.replace(topic, exp, newVal);
}
/**
* Change system listener.
*
* @param topic Topic.
- * @param expected Expected value.
+ * @param exp Expected value.
* @param newVal New value.
* @return Result.
*/
- private boolean systemListenerChange(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+ private boolean systemListenerChange(Object topic, GridMessageListener exp, GridMessageListener newVal) {
assert Thread.holdsLock(sysLsnrsMux);
assert topic instanceof GridTopic;
@@ -962,7 +968,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
GridMessageListener old = sysLsnrs[idx];
- if (old != null && old.equals(expected)) {
+ if (old != null && old.equals(exp)) {
changeSystemListener(idx, newVal);
return true;
@@ -1263,6 +1269,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert node != null;
assert topic != null;
assert msg != null;
+ assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
+ assert topicOrd >= 0 || !(topic instanceof GridTopic) : msg;
GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
@@ -1276,11 +1284,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (ordered)
processOrderedMessage(locNodeId, ioMsg, plc, null);
- else if (async) {
- assert msg instanceof GridIoUserMessage : ioMsg; // Async execution was added only for IgniteMessaging.
-
- processRegularMessage(locNodeId, ioMsg, plc, null);
- }
+ else if (async)
+ processRegularMessage(locNodeId, ioMsg, plc, NOOP);
else
processRegularMessage0(ioMsg, locNodeId);
@@ -1313,14 +1318,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(UUID nodeId, Object topic, Message msg, byte plc)
+ public void sendToCustomTopic(UUID nodeId, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, msg, plc);
+ sendToCustomTopic(node, topic, msg, plc);
}
/**
@@ -1331,7 +1336,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@SuppressWarnings("TypeMayBeWeakened")
- public void send(UUID nodeId, GridTopic topic, Message msg, byte plc)
+ public void sendToGridTopic(UUID nodeId, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
ClusterNode node = ctx.discovery().node(nodeId);
@@ -1348,7 +1353,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(ClusterNode node, Object topic, Message msg, byte plc)
+ public void sendToCustomTopic(ClusterNode node, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
send(node, topic, -1, msg, plc, false, 0, false, null, false);
}
@@ -1358,12 +1363,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
- * @param async Async flag.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+ public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, async);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
}
/**
@@ -1374,7 +1378,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
+ public void sendGeneric(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
throws IgniteCheckedException {
send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
}
@@ -1402,33 +1406,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
- * @param nodeId Destination node.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param timeout Timeout to keep a message on receiving queue.
- * @param skipOnTimeout Whether message can be skipped on timeout.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void sendOrderedMessage(
- UUID nodeId,
- Object topic,
- Message msg,
- byte plc,
- long timeout,
- boolean skipOnTimeout
- ) throws IgniteCheckedException {
- assert timeout > 0 || skipOnTimeout;
-
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null)
- throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
- }
-
- /**
* @param node Destination nodes.
* @param topic Topic to send the message to.
* @param msg Message to send.
@@ -1436,8 +1413,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param ackC Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
- IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+ public void sendToGridTopic(ClusterNode node,
+ GridTopic topic,
+ Message msg,
+ byte plc,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException
+ {
send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
}
@@ -1450,9 +1431,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param skipOnTimeout Whether message can be skipped on timeout.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void sendOrderedMessage(
+ void sendOrderedMessageToGridTopic(
Collection<? extends ClusterNode> nodes,
- Object topic,
+ GridTopic topic,
Message msg,
byte plc,
long timeout,
@@ -1461,36 +1442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
- }
-
- /**
- * @param node Destination nodes.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param ackC Ack closure.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
- throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, 0, false, ackC, false);
- }
-
- /**
- * @param nodes Destination node.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void send(
- Collection<? extends ClusterNode> nodes,
- Object topic,
- Message msg,
- byte plc
- ) throws IgniteCheckedException {
- send(nodes, topic, -1, msg, plc, false, 0, false);
+ send(nodes, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout);
}
/**
@@ -1500,7 +1452,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void send(
+ public void sendToGridTopic(
Collection<? extends ClusterNode> nodes,
GridTopic topic,
Message msg,
@@ -1540,7 +1492,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param msg Message to send.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
+ void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
sendUserMessage(nodes, msg, null, false, 0, false);
}
@@ -1556,8 +1508,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@SuppressWarnings("ConstantConditions")
- public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg,
- @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException {
+ public void sendUserMessage(Collection<? extends ClusterNode> nodes,
+ Object msg,
+ @Nullable Object topic,
+ boolean ordered,
+ long timeout,
+ boolean async) throws IgniteCheckedException
+ {
boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId);
byte[] serMsg = null;
@@ -1600,22 +1557,42 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
dep != null ? dep.participants() : null);
if (ordered)
- sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
- else if (loc)
- send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+ sendOrderedMessageToGridTopic(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
+ else if (loc) {
+ send(F.first(nodes),
+ TOPIC_COMM_USER,
+ TOPIC_COMM_USER.ordinal(),
+ ioMsg,
+ PUBLIC_POOL,
+ false,
+ 0,
+ false,
+ null,
+ async);
+ }
else {
ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(locNodeId));
if (!rmtNodes.isEmpty())
- send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ sendToGridTopic(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
// Will call local listeners in current thread synchronously or through pool,
// depending async flag, so must go the last
// to allow remote nodes execute the requested operation in parallel.
- if (locNode != null)
- send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+ if (locNode != null) {
+ send(locNode,
+ TOPIC_COMM_USER,
+ TOPIC_COMM_USER.ordinal(),
+ ioMsg,
+ PUBLIC_POOL,
+ false,
+ 0,
+ false,
+ null,
+ async);
+ }
}
}
@@ -1657,35 +1634,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
- * @param nodeId Destination node.
- * @param topic Topic to send the message to.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param timeout Timeout to keep a message on receiving queue.
- * @param skipOnTimeout Whether message can be skipped on timeout.
- * @param ackC Ack closure.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void sendOrderedMessage(
- UUID nodeId,
- Object topic,
- Message msg,
- byte plc,
- long timeout,
- boolean skipOnTimeout,
- IgniteInClosure<IgniteException> ackC
- ) throws IgniteCheckedException {
- assert timeout > 0 || skipOnTimeout;
-
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null)
- throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
- }
-
- /**
* @param nodes Destination nodes.
* @param topic Topic to send the message to.
* @param topicOrd Topic ordinal value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index a571ae4..ffbde37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -294,7 +294,7 @@ class GridDeploymentCommunication {
if (node != null) {
try {
- ctx.io().send(node, topic, res, GridIoPolicy.P2P_POOL);
+ ctx.io().sendToCustomTopic(node, topic, res, GridIoPolicy.P2P_POOL);
if (log.isDebugEnabled())
log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']');
@@ -324,7 +324,7 @@ class GridDeploymentCommunication {
Message req = new GridDeploymentRequest(null, null, rsrcName, true);
if (!rmtNodes.isEmpty()) {
- ctx.io().send(
+ ctx.io().sendToGridTopic(
rmtNodes,
TOPIC_CLASSLOAD,
req,
@@ -445,7 +445,7 @@ class GridDeploymentCommunication {
if (req.responseTopic() != null && !ctx.localNodeId().equals(dstNode.id()))
req.responseTopicBytes(U.marshal(marsh, req.responseTopic()));
- ctx.io().send(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);
+ ctx.io().sendToGridTopic(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);
if (log.isDebugEnabled())
log.debug("Sent peer class loading request [node=" + dstNode.id() + ", req=" + req + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index b5d5ee2..656c739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1040,12 +1040,12 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(ctx.localNodeId()));
if (locNode != null)
- ctx.io().send(locNode, topic, msg, plc);
+ ctx.io().sendToGridTopic(locNode, topic, msg, plc);
if (!rmtNodes.isEmpty()) {
msg.responseTopicBytes(U.marshal(marsh, msg.responseTopic()));
- ctx.io().send(rmtNodes, topic, msg, plc);
+ ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc);
}
}
@@ -1164,7 +1164,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
res.exceptionBytes(U.marshal(marsh, res.exception()));
}
- ctx.io().send(node, req.responseTopic(), res, PUBLIC_POOL);
+ ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send event query response to node [node=" + nodeId + ", res=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index d20310b..50f58cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -908,7 +908,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
try {
cnt++;
- cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
+ cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
return;
}
@@ -969,7 +969,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
});
- cctx.gridIO().send(nodesView, TOPIC_CACHE, msg, plc);
+ cctx.gridIO().sendToGridTopic(nodesView, TOPIC_CACHE, msg, plc);
boolean added = false;
@@ -1116,7 +1116,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param plc IO policy.
* @throws IgniteCheckedException If send failed.
*/
- public void sendNoRetry(ClusterNode node,
+ void sendNoRetry(ClusterNode node,
GridCacheMessage msg,
byte plc)
throws IgniteCheckedException {
@@ -1127,7 +1127,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
return;
try {
- cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
+ cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
if (log.isDebugEnabled())
log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a1a18fe..f4a5629 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2065,7 +2065,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!cctx.localNodeId().equals(nodeId))
req.prepareMarshal(cctx);
- cctx.gridIO().send(node, TOPIC_TX, req, SYSTEM_POOL);
+ cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyCheckedException) {
@@ -2508,7 +2508,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!cctx.localNodeId().equals(nodeId))
res.prepareMarshal(cctx);
- cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL);
+ cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send response to node [node=" + nodeId + ", res=" + res + ']', e);
@@ -2545,7 +2545,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
res.futureId(req.futureId());
try {
- cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL);
+ cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId +
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 257d0d9..d644261 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -320,7 +320,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
snapshot.version(), snapshot.deltas());
try {
- ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
+ ctx.io().sendToGridTopic(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
if (ctx.discovery().pingNodeNoError(n.id()))
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 575bc69..55f65c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1397,7 +1397,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ackC);
}
else
- ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
+ ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
break;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index b6400e8..74d5f4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -420,7 +420,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
try {
- ctx.io().send(nodeId, resTopic, res, threadIoPolicy());
+ ctx.io().sendToCustomTopic(nodeId, resTopic, res, threadIoPolicy());
}
catch (IgniteCheckedException e) {
if (ctx.discovery().alive(nodeId))
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index f97fc14..4c1de2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1693,7 +1693,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
topVer);
try {
- ctx.io().send(node, TOPIC_DATASTREAM, req, plc);
+ ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, plc);
if (log.isDebugEnabled())
log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 4c037b7..0b2558a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
import org.jetbrains.annotations.Nullable;
/**
@@ -169,7 +170,10 @@ public class IgfsContext {
if (!kernalContext().localNodeId().equals(nodeId))
msg.prepareMarshal(kernalContext().config().getMarshaller());
- kernalContext().io().send(nodeId, topic, msg, plc);
+ if (topic instanceof GridTopic)
+ kernalContext().io().sendToGridTopic(nodeId, (GridTopic)topic, msg, plc);
+ else
+ kernalContext().io().sendToCustomTopic(nodeId, topic, msg, plc);
}
/**
@@ -184,7 +188,7 @@ public class IgfsContext {
if (!kernalContext().localNodeId().equals(node.id()))
msg.prepareMarshal(kernalContext().config().getMarshaller());
- kernalContext().io().send(node, topic, msg, plc);
+ kernalContext().io().sendToCustomTopic(node, topic, msg, plc);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 2b6699d..9ed6ff3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -518,7 +518,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
ctx.io().addMessageListener(topic, msgLsnr);
// 3. Send message.
- ctx.io().send(taskNode, TOPIC_JOB_SIBLINGS,
+ ctx.io().sendToGridTopic(taskNode, TOPIC_JOB_SIBLINGS,
new GridJobSiblingsRequest(ses.getId(),
loc ? topic : null,
loc ? null : U.marshal(marsh, topic)),
@@ -1379,7 +1379,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
else
// Send response to common topic as unordered message.
- ctx.io().send(sndNode, TOPIC_TASK, jobRes, req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL);
+ ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// The only option here is to log, as we must assume that resending will fail too.
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index acefde7..9b7615f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -912,7 +912,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
else
// Send response to common topic as unordered message.
- ctx.io().send(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
+ ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// Log and invoke the master-leave callback.
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
index 773dabe..0be4e09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
@@ -96,7 +96,7 @@ final class ClientRequestFuture extends GridFutureAdapter<MappingExchangeResult>
ClusterNode srvNode = aliveSrvNodes.poll();
try {
- ioMgr.send(
+ ioMgr.sendToGridTopic(
srvNode,
GridTopic.TOPIC_MAPPING_MARSH,
new MissingMappingRequestMessage(
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index fdea869..66c19a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -178,7 +178,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
try {
- ioMgr.send(
+ ioMgr.sendToGridTopic(
nodeId,
TOPIC_MAPPING_MARSH,
new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 947435c..99ba335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -143,7 +143,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
Object topic = U.unmarshal(ctx, req.topicBytes(), U.resolveClassLoader(ctx.config()));
- ctx.io().send(nodeId, topic, res, SYSTEM_POOL);
+ ctx.io().sendToCustomTopic(nodeId, topic, res, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send job task result response.", e);
@@ -494,7 +494,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
try {
byte[] topicBytes = U.marshal(ctx, topic);
- ctx.io().send(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
+ ctx.io().sendToGridTopic(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
String errMsg = "Failed to send task result request [resHolderId=" + resHolderId +
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d32b51c..ec5d4c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1311,7 +1311,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
boolean loc = ctx.localNodeId().equals(nodeId);
- ctx.io().send(nodeId, topic,
+ ctx.io().sendToCustomTopic(nodeId, topic,
new GridJobSiblingsResponse(
loc ? siblings : null,
loc ? null : U.marshal(marsh, siblings)),
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index d18ea5f..02ef0fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1281,7 +1281,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
ClusterNode node = ctx.discovery().node(nodeId);
if (node != null)
- ctx.io().send(node,
+ ctx.io().sendToGridTopic(node,
TOPIC_JOB_CANCEL,
new GridJobCancelRequest(ses.getId(), res.getJobContext().getJobId(), /*courtesy*/true),
PUBLIC_POOL);
@@ -1382,7 +1382,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
else {
// Send job execution request.
- ctx.io().send(node, TOPIC_JOB, req, internal ? MANAGEMENT_POOL : PUBLIC_POOL);
+ ctx.io().sendToGridTopic(node, TOPIC_JOB, req, internal ? MANAGEMENT_POOL : PUBLIC_POOL);
if (log.isDebugEnabled())
log.debug("Sent job request [req=" + req + ", node=" + node + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 8503b48..f58be87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -129,7 +129,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
long time = System.nanoTime();
for (int i = 1; i <= SAMPLE_CNT; i++) {
- mgr0.send(grid(1).localNode(), topic, new TestMessage(), GridIoPolicy.PUBLIC_POOL);
+ mgr0.sendToCustomTopic(grid(1).localNode(), topic, new TestMessage(), GridIoPolicy.PUBLIC_POOL);
if (i % 500 == 0)
info("Sent messages count: " + i);
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index f5499d3..f4257a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -90,21 +90,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
public void testSendIfOneOfNodesIsLocalAndTopicIsEnum() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- new GridIoManager(ctx).send(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, new TestMessage(),
- GridIoPolicy.P2P_POOL);
-
- return null;
- }
- }, AssertionError.class, "Internal Ignite code should never call the method with local node in a node list.");
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSendIfOneOfNodesIsLocalAndTopicIsObject() throws Exception {
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- new GridIoManager(ctx).send(F.asList(locNode, rmtNode), new Object(), new TestMessage(),
+ new GridIoManager(ctx).sendToGridTopic(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, new TestMessage(),
GridIoPolicy.P2P_POOL);
return null;
@@ -127,12 +113,12 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
// No-op. We are using mocks so real sending is impossible.
}
- verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
+ verify(ioMgr).sendToGridTopic(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
eq(GridIoPolicy.PUBLIC_POOL));
Collection<? extends ClusterNode> rmtNodes = F.view(F.asList(rmtNode), F.remoteNodes(locNode.id()));
- verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
+ verify(ioMgr).sendToGridTopic(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL));
}
@@ -151,12 +137,12 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
// No-op. We are using mocks so real sending is impossible.
}
- verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
+ verify(ioMgr).sendToGridTopic(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
eq(GridIoPolicy.PUBLIC_POOL));
Collection<? extends ClusterNode> rmtNodes = F.view(F.asList(rmtNode), F.remoteNodes(locNode.id()));
- verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
+ verify(ioMgr).sendToGridTopic(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL));
}
@@ -175,7 +161,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
// No-op. We are using mocks so real sending is impossible.
}
- verify(ioMgr).sendOrderedMessage(
+ verify(ioMgr).sendOrderedMessageToGridTopic(
argThat(new IsEqualCollection(F.asList(locNode, rmtNode))),
eq(GridTopic.TOPIC_COMM_USER),
any(GridIoUserMessage.class),
@@ -196,7 +182,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+ @Override public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
index 9961833..8ac6e6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
@@ -74,7 +74,7 @@ public class IgniteExceptionInNioWorkerSelfTest extends GridCommonAbstractTest {
UUID nodeId = ignite(1).cluster().localNode().id();
// This should trigger a failure in a NIO thread.
- kernal.context().io().send(nodeId, GridTopic.TOPIC_CACHE.topic("cache"), new BrokenMessage(), (byte)0);
+ kernal.context().io().sendToCustomTopic(nodeId, GridTopic.TOPIC_CACHE.topic("cache"), new BrokenMessage(), (byte)0);
for (int i = 0; i < 100; i++)
ignite(0).cache("cache").put(i, i);
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
index 723495c..03bbb00 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
@@ -249,7 +249,7 @@ public class GridIoManagerBenchmark {
testMsg.bytes(null);
try {
- io.send(node, TEST_TOPIC, testMsg, PUBLIC_POOL);
+ io.sendToCustomTopic(node, TEST_TOPIC, testMsg, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
e.printStackTrace();
@@ -293,7 +293,7 @@ public class GridIoManagerBenchmark {
else
sem.acquire();
- io.send(
+ io.sendToCustomTopic(
dst,
TEST_TOPIC,
new GridTestMessage(msgId, testHeavyMsgs ? arrs[rnd.nextInt(arrs.length)] : null),
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index f2c6255..92b29e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -132,7 +132,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg) {
try {
- rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+ rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
error("Failed to send message.", e);
@@ -176,7 +176,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
while (!finish.get()) {
sem.acquire();
- snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+ snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
}
}
catch (IgniteCheckedException e) {
@@ -226,7 +226,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg) {
try {
- rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+ rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
error("Failed to send message.", e);
@@ -270,7 +270,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
map.put(msgId, latch);
- snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+ snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
latch.await();
@@ -326,7 +326,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg) {
try {
- rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+ rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
error("Failed to send message.", e);
@@ -362,7 +362,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
sem.acquire();
- snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+ snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
}
return null;
@@ -432,7 +432,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
latches.put(msgId, latch);
- snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+ snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
long start = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 9c97542..c0ea662 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -194,7 +194,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
msg.add(mes2);
}
- mgr0.send(grid(1).localNode(), topic, msg, GridIoPolicy.PUBLIC_POOL);
+ mgr0.sendToCustomTopic(grid(1).localNode(), topic, msg, GridIoPolicy.PUBLIC_POOL);
assert latch.await(3, SECONDS);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index e67a26a..7575ff4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -145,7 +145,7 @@ public class HadoopShuffle extends HadoopComponent {
ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
if (msg instanceof Message)
- ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
+ ctx.kernalContext().io().sendToGridTopic(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
else
ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false);
}
@@ -153,6 +153,7 @@ public class HadoopShuffle extends HadoopComponent {
/**
* @param jobId Task info.
* @return Shuffle job.
+ * @throws IgniteCheckedException If failed.
*/
private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException {
HadoopShuffleJob<UUID> res = jobs.get(jobId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e4b0c1f..88cd89b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1890,7 +1890,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
((GridCacheQueryMarshallable)msg).marshall(marshaller);
}
- ctx.io().send(node, topic, topicOrd, msg, plc);
+ ctx.io().sendGeneric(node, topic, topicOrd, msg, plc);
}
catch (IgniteCheckedException e) {
ok = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 2802da5..33a6778 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -655,7 +655,7 @@ public class GridMapQueryExecutor {
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
}
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
}
catch (Exception e) {
e.addSuppressed(err);
@@ -729,7 +729,7 @@ public class GridMapQueryExecutor {
if (loc)
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
}
catch (IgniteCheckedException e) {
log.error("Failed to send message.", e);
@@ -756,7 +756,7 @@ public class GridMapQueryExecutor {
if (loc)
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
}
catch (Exception e) {
U.warn(log, "Failed to send retry message: " + e.getMessage());
http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 61ca11d..604e522 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -328,7 +328,7 @@ public class GridReduceQueryExecutor {
if (node.isLocal())
h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.QUERY_POOL);
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.QUERY_POOL);
}
catch (IgniteCheckedException e) {
throw new CacheException("Failed to fetch data from node: " + node.id(), e);
[3/3] ignite git commit: ignite-4705
Posted by sb...@apache.org.
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46c71ae5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46c71ae5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46c71ae5
Branch: refs/heads/ignite-4705-2
Commit: 46c71ae512edc39a364b0389b9057e9935517130
Parents: ff55b4d
Author: sboikov <sb...@gridgain.com>
Authored: Sat Mar 4 12:46:53 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Sat Mar 4 19:33:17 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 11 +-
.../GridCachePartitionExchangeManager.java | 85 +++++------
.../GridDhtAtomicAbstractUpdateFuture.java | 15 +-
.../dht/atomic/GridDhtAtomicCache.java | 140 ++++++++++---------
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 5 -
.../GridDhtAtomicSingleUpdateRequest.java | 6 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 5 -
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 6 +-
.../GridNearAtomicAbstractUpdateFuture.java | 61 ++++++--
.../GridNearAtomicAbstractUpdateRequest.java | 14 +-
.../GridNearAtomicCheckUpdateRequest.java | 9 +-
.../GridNearAtomicSingleUpdateFuture.java | 34 ++---
.../dht/atomic/GridNearAtomicUpdateFuture.java | 76 +++++++---
.../GridDhtPartitionsExchangeFuture.java | 4 +-
...niteCacheClientNodeChangingTopologyTest.java | 5 +-
15 files changed, 287 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 9aec10d..ba2f86e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -216,13 +216,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (fut != null && !fut.isDone()) {
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ Runnable c = new Runnable() {
@Override public void run() {
IgniteLogger log = cacheMsg.messageLogger(cctx);
if (log.isDebugEnabled()) {
StringBuilder msg0 = new StringBuilder("Process cache message after wait for " +
- "affinity topology version [");
+ "affinity topology version [");
appendMessageInfo(cacheMsg, nodeId, msg0).append(']');
@@ -231,7 +231,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
handleMessage(nodeId, cacheMsg);
}
- });
+ };
+
+ if (((GridCacheMessage)msg).partition() >= 0)
+ cctx.kernalContext().getStripedExecutorService().execute(((GridCacheMessage) msg).partition(), c);
+ else
+ cctx.kernalContext().closure().runLocalSafe(c);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e44f4a8..5a64758 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -62,6 +62,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPar
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -79,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridListSet;
+import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -1477,68 +1480,70 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param exchTopVer Exchange topology version.
*/
private void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer) {
- IgniteTxManager tm = cctx.tm();
+ synchronized (getClass()) {
+ IgniteTxManager tm = cctx.tm();
- if (tm != null) {
- U.warn(log, "Pending transactions:");
+ if (tm != null) {
+ U.warn(log, "Pending transactions:");
- for (IgniteInternalTx tx : tm.activeTransactions()) {
- if (exchTopVer != null) {
- U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() +
- ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) +
- ", tx=" + tx + ']');
+ for (IgniteInternalTx tx : tm.activeTransactions()) {
+ if (exchTopVer != null) {
+ U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() +
+ ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) +
+ ", tx=" + tx + ']');
+ }
+ else
+ U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']');
}
- else
- U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']');
}
- }
- GridCacheMvccManager mvcc = cctx.mvcc();
+ GridCacheMvccManager mvcc = cctx.mvcc();
- if (mvcc != null) {
- U.warn(log, "Pending explicit locks:");
+ if (mvcc != null) {
+ U.warn(log, "Pending explicit locks:");
- for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks())
- U.warn(log, ">>> " + lockSpan);
+ for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks())
+ U.warn(log, ">>> " + lockSpan);
- U.warn(log, "Pending cache futures:");
+ U.warn(log, "Pending cache futures:");
- for (GridCacheFuture<?> fut : mvcc.activeFutures())
- U.warn(log, ">>> " + fut);
+ for (GridCacheFuture<?> fut : mvcc.activeFutures())
+ U.warn(log, ">>> " + fut);
- U.warn(log, "Pending atomic cache futures:");
+ U.warn(log, "Pending atomic cache futures:");
- for (GridCacheFuture<?> fut : mvcc.atomicFutures())
- U.warn(log, ">>> " + fut);
+ for (GridCacheFuture<?> fut : mvcc.atomicFutures())
+ U.warn(log, ">>> " + fut);
- U.warn(log, "Pending data streamer futures:");
+ U.warn(log, "Pending data streamer futures:");
- for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())
- U.warn(log, ">>> " + fut);
+ for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())
+ U.warn(log, ">>> " + fut);
- if (tm != null) {
- U.warn(log, "Pending transaction deadlock detection futures:");
+ if (tm != null) {
+ U.warn(log, "Pending transaction deadlock detection futures:");
- for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures())
- U.warn(log, ">>> " + fut);
+ for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures())
+ U.warn(log, ">>> " + fut);
+ }
}
- }
- for (GridCacheContext ctx : cctx.cacheContexts()) {
- if (ctx.isLocal())
- continue;
+ for (GridCacheContext ctx : cctx.cacheContexts()) {
+ if (ctx.isLocal())
+ continue;
- GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
+ GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
- GridCachePreloader preloader = ctx0.preloader();
+ GridCachePreloader preloader = ctx0.preloader();
- if (preloader != null)
- preloader.dumpDebugInfo();
+ if (preloader != null)
+ preloader.dumpDebugInfo();
- GridCacheAffinityManager affMgr = ctx0.affinity();
+ GridCacheAffinityManager affMgr = ctx0.affinity();
- if (affMgr != null)
- affMgr.dumpDebugInfo();
+ if (affMgr != null)
+ affMgr.dumpDebugInfo();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5e01726..dcd4a18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -78,6 +79,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
protected final GridCacheContext cctx;
/** Future version. */
+ @GridToStringInclude
protected final Long futId;
/** Update request. */
@@ -94,9 +96,11 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
private volatile int resCnt;
/** */
+ @GridToStringExclude
private final GridNearAtomicUpdateResponse updateRes;
/** */
+ @GridToStringExclude
private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
/**
@@ -368,9 +372,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
!ret.emptyResult() ||
- updateRes.nearVersion() != null;
+ updateRes.nearVersion() != null ||
+ cctx.localNodeId().equals(nearNode.id());
- boolean needMapping = updateReq.fullSync() && (!updateReq.mappingKnown() || !allUpdated());
+ boolean needMapping = updateReq.fullSync() && (!updateReq.needPrimaryResponse() || !allUpdated());
if (needMapping) {
initMapping(updateRes);
@@ -470,12 +475,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/**
- * @param updateRes Response.
- * @param err Error.
- */
- protected abstract void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err);
-
- /**
* @param nodeId Node ID.
* @param futId Future ID.
* @param writeVer Update version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fc0e16c..5cd07b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -3078,7 +3078,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param checkReq Request.
*/
private void processCheckUpdateRequest(UUID nodeId, GridNearAtomicCheckUpdateRequest checkReq) {
- /**
+ /*
* Message is processed in the same stripe, so primary already processed update request. It is possible
* response was not sent if operation result was empty. Near node will get original response or this one.
*/
@@ -3104,6 +3104,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param req Dht atomic update request.
*/
private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) {
+ assert Thread.currentThread().getName().startsWith("sys-stripe-") : Thread.currentThread().getName();
+
if (msgLog.isDebugEnabled()) {
msgLog.debug("Received DHT atomic update request [futId=" + req.futureId() +
", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
@@ -3272,66 +3274,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
- *
- */
- private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable {
- /** */
- private final int part;
-
- /** */
- private final UUID primaryId;
-
- /** */
- private final IgniteUuid id;
-
- /** */
- private final long endTime;
-
- /**
- * @param part Partition.
- * @param primaryId Primary ID.
- */
- DeferredUpdateTimeout(int part, UUID primaryId) {
- this.part = part;
- this.primaryId = primaryId;
-
- endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
-
- id = IgniteUuid.fromUuid(primaryId);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid timeoutId() {
- return id;
- }
-
- /** {@inheritDoc} */
- @Override public long endTime() {
- return endTime;
- }
-
- /** {@inheritDoc} */
- @Override public void run() {
- Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
-
- GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
-
- if (msg != null && msg.timeoutSender() == this) {
- msg.timeoutSender(null);
-
- resMap.remove(primaryId);
-
- sendDeferredUpdateResponse(primaryId, msg);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- ctx.kernalContext().getStripedExecutorService().execute(part, this);
- }
- }
-
- /**
* @param part Partition.
* @param primaryId Primary ID.
* @param futId Future ID.
@@ -3362,7 +3304,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
futIds.add(futId);
- if (futIds.size() == DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) {
+ if (futIds.size() >= DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) {
resMap.remove(primaryId);
sendDeferredUpdateResponse(primaryId, msg);
@@ -3375,14 +3317,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void sendDeferredUpdateResponse(UUID primaryId, GridDhtAtomicDeferredUpdateResponse msg) {
try {
- ctx.kernalContext().gateway().readLock();
+ //ctx.kernalContext().gateway().readLock();
- GridTimeoutObject timeoutSnd = msg.timeoutSender();
+ try {
+ GridTimeoutObject timeoutSnd = msg.timeoutSender();
- if (timeoutSnd != null)
- ctx.time().removeTimeoutObject(timeoutSnd);
+ if (timeoutSnd != null)
+ ctx.time().removeTimeoutObject(timeoutSnd);
- try {
ctx.io().send(primaryId, msg, ctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
@@ -3391,7 +3333,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
finally {
- ctx.kernalContext().gateway().readUnlock();
+ // ctx.kernalContext().gateway().readUnlock();
}
}
catch (IllegalStateException ignored) {
@@ -3706,7 +3648,67 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
*
*/
- static interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> {
+ interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> {
// No-op.
}
+
+ /**
+ *
+ */
+ private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable {
+ /** */
+ private final int part;
+
+ /** */
+ private final UUID primaryId;
+
+ /** */
+ private final IgniteUuid id;
+
+ /** */
+ private final long endTime;
+
+ /**
+ * @param part Partition.
+ * @param primaryId Primary ID.
+ */
+ DeferredUpdateTimeout(int part, UUID primaryId) {
+ this.part = part;
+ this.primaryId = primaryId;
+
+ endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
+
+ id = IgniteUuid.fromUuid(primaryId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
+
+ GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
+
+ if (msg != null && msg.timeoutSender() == this) {
+ msg.timeoutSender(null);
+
+ resMap.remove(primaryId);
+
+ sendDeferredUpdateResponse(primaryId, msg);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ ctx.kernalContext().getStripedExecutorService().execute(part, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 0566ce4..879dd40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -120,11 +120,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
}
}
- /** {@inheritDoc} */
- @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
-
- }
-
/**
* @param ttl TTL.
* @param conflictExpireTime Conflict expire time.
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 10dc77c..092bccb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -197,7 +197,11 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public int partition() {
- return key.partition();
+ int p = key.partition();
+
+ assert p >= 0;
+
+ return p;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 49e168a..32df24d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -103,11 +103,6 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
}
/** {@inheritDoc} */
- @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
-
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtAtomicUpdateFuture.class, this, "super", super.toString());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 78368fb..6b8af8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -427,7 +427,11 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
@Override public int partition() {
assert !F.isEmpty(keys) || !F.isEmpty(nearKeys);
- return keys.size() > 0 ? keys.get(0).partition() : nearKeys.get(0).partition();
+ int p = keys.size() > 0 ? keys.get(0).partition() : nearKeys.get(0).partition();
+
+ assert p >= 0;
+
+ return p;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 5369d53..bb1e224 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -205,8 +206,21 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
this.remapCnt = remapCnt;
}
- void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
+ /** {@inheritDoc} */
+ @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ return null;
+ }
+ void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
+ try {
+ cctx.io().send(req.updateRequest().nodeId(), req, cctx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ onSendError(req, e);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
}
/**
@@ -370,6 +384,17 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
onPrimaryResponse(req.nodeId(), res, true);
}
+ final void onSendError(GridNearAtomicCheckUpdateRequest req, IgniteCheckedException e) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.updateRequest().nodeId(),
+ req.futureId(),
+ cctx.deploymentEnabled());
+
+ res.addFailedKeys(req.updateRequest().keys(), e);
+
+ onPrimaryResponse(req.updateRequest().nodeId(), res, true);
+ }
+
/**
*
*/
@@ -381,7 +406,11 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
@GridToStringInclude
private Set<UUID> dhtNodes;
+ @GridToStringInclude
+ private List<UUID> dhtNodes0;
+
/** */
+ @GridToStringInclude
private Set<UUID> rcvd;
/** */
@@ -396,6 +425,11 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
this.req = req;
if (req.initMappingLocally()) {
+ dhtNodes0 = new ArrayList<>();
+
+ for (ClusterNode n : nodes)
+ dhtNodes0.add(n.id());
+
if (single) {
if (nodes.size() > 1) {
dhtNodes = U.newHashSet(nodes.size() - 1);
@@ -424,13 +458,16 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
for (int i = 1; i < nodes.size(); i++)
dhtNodes.add(nodes.get(i).id());
+
+ for (int i = 1; i < nodes.size(); i++)
+ dhtNodes0.add(nodes.get(i).id());
}
- boolean checkDhtNodes(GridCacheContext cctx) {
+ DhtLeftResult checkDhtNodes(GridCacheContext cctx) {
assert req.initMappingLocally() : req;
if (finished())
- return false;
+ return DhtLeftResult.NOT_DONE;
boolean finished = false;
@@ -448,7 +485,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
}
- return finished;
+ if (finished)
+ return DhtLeftResult.DONE;
+
+ if (dhtNodes.isEmpty())
+ return req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
+
+ return DhtLeftResult.NOT_DONE;
}
/**
@@ -487,7 +530,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
if (hasRes)
return DhtLeftResult.DONE;
else
- return req.mappingKnown() ? DhtLeftResult.ALL_RCVD_CHECK_UPDATE : DhtLeftResult.NOT_DONE;
+ return req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
}
return DhtLeftResult.NOT_DONE;
@@ -572,8 +615,10 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PrimaryRequestState.class, this,
- "node", req.nodeId(),
- "rcvdRes", req.response() != null);
+ "primary", primaryId(),
+ "mapppingKnown", req.needPrimaryResponse(),
+ "primaryRes", req.response() != null,
+ "done", finished());
}
}
@@ -588,7 +633,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
NOT_DONE,
/** */
- ALL_RCVD_CHECK_UPDATE
+ ALL_RCVD_CHECK_PRIMARY
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a833588..34a3c0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -47,7 +47,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
public static final int CACHE_MSG_IDX = nextIndexId();
/** . */
- private static final int MAPPING_KNOWN_FLAG_MASK = 0x01;
+ private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01;
/** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
private static final int TOP_LOCKED_FLAG_MASK = 0x02;
@@ -141,7 +141,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
this.addDepInfo = addDepInfo;
if (mappingKnown)
- mappingKnown(true);
+ needPrimaryResponse(true);
if (topLocked)
topologyLocked(true);
@@ -177,11 +177,11 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
boolean initMappingLocally() {
- return mappingKnown() && fullSync();
+ return needPrimaryResponse() && fullSync();
}
- boolean mappingKnown() {
- return isFlag(MAPPING_KNOWN_FLAG_MASK);
+ boolean needPrimaryResponse() {
+ return isFlag(NEED_PRIMARY_RES_FLAG_MASK);
}
boolean fullSync() {
@@ -190,8 +190,8 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
return syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
}
- void mappingKnown(boolean stableTop) {
- setFlag(stableTop, MAPPING_KNOWN_FLAG_MASK);
+ void needPrimaryResponse(boolean stableTop) {
+ setFlag(stableTop, NEED_PRIMARY_RES_FLAG_MASK);
}
public int taskNameHash() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
index 030abdf..a30269b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
@@ -51,14 +51,13 @@ public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage {
// No-op.
}
- GridNearAtomicCheckUpdateRequest(int cacheId, GridNearAtomicAbstractUpdateRequest updateReq, int partId, long futId) {
+ GridNearAtomicCheckUpdateRequest(GridNearAtomicAbstractUpdateRequest updateReq) {
assert updateReq != null;
- assert partId >= 0 : partId;
- this.cacheId = cacheId;
this.updateReq = updateReq;
- this.partId = partId;
- this.futId = futId;
+ this.cacheId = updateReq.cacheId();
+ this.partId = updateReq.partition();
+ this.futId = updateReq.futureId();
}
long futureId() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 82d397d..f96de31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -164,11 +162,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (res == DhtLeftResult.DONE)
rcvAll = true;
- else if (res == DhtLeftResult.ALL_RCVD_CHECK_UPDATE) {
- checkReq = new GridNearAtomicCheckUpdateRequest(cctx.cacheId(),
- reqState.req,
- reqState.req.partition(),
- futId);
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) {
+ checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req);
}
else
return false;
@@ -190,11 +185,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
- return null;
- }
-
- /** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
assert res == null || res instanceof GridCacheReturn;
@@ -529,9 +519,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
* @return
*/
private boolean checkDhtNodes(Long futId) {
- GridCacheReturn opRes0;
- CachePartialUpdateCheckedException err0;
- AffinityTopologyVersion remapTopVer0;
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
+ AffinityTopologyVersion remapTopVer0 = null;
+
+ GridNearAtomicCheckUpdateRequest checkReq = null;
synchronized (mux) {
if (this.futId == null || !this.futId.equals(futId))
@@ -539,16 +531,24 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
assert reqState != null;
- if (reqState.checkDhtNodes(cctx)) {
+ DhtLeftResult res = reqState.checkDhtNodes(cctx);
+
+ if (res == DhtLeftResult.DONE) {
opRes0 = opRes;
err0 = err;
remapTopVer0 = onAllReceived();
}
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY){
+ checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req);
+ }
else
return true;
}
- finishUpdateFuture(opRes0, err0, remapTopVer0);
+ if (checkReq != null)
+ sendCheckUpdateRequest(checkReq);
+ else
+ finishUpdateFuture(opRes0, err0, remapTopVer0);
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 7b1c530..77ba579 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -164,6 +164,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
boolean rcvAll = false;
+ List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
+
synchronized (mux) {
if (futId == null)
return false;
@@ -173,6 +175,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
GridNearAtomicAbstractUpdateRequest req = singleReq.processPrimaryResponse(nodeId);
if (req != null) {
+ rcvAll = true;
+
GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
singleReq.onPrimaryResponse(res, cctx);
@@ -181,7 +185,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
}
else {
- singleReq.onDhtNodeLeft(nodeId);
+ DhtLeftResult res = singleReq.onDhtNodeLeft(nodeId);
+
+ if (res == DhtLeftResult.DONE)
+ rcvAll = true;
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
+ checkReqs = Collections.singletonList(new GridNearAtomicCheckUpdateRequest(singleReq.req));
}
if (rcvAll) {
@@ -215,7 +224,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
}
else {
- reqState.onDhtNodeLeft(nodeId);
+ DhtLeftResult res = reqState.onDhtNodeLeft(nodeId);
+
+ if (res == DhtLeftResult.DONE)
+ reqDone = true;
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) {
+ if (checkReqs == null)
+ checkReqs = new ArrayList<>();
+
+ checkReqs.add(new GridNearAtomicCheckUpdateRequest(reqState.req));
+ }
}
if (reqDone) {
@@ -237,18 +255,19 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
}
- if (rcvAll)
+ if (checkReqs != null) {
+ assert !rcvAll;
+
+ for (int i = 0; i < checkReqs.size(); i++)
+ sendCheckUpdateRequest(checkReqs.get(i));
+ }
+ else if (rcvAll)
finishUpdateFuture(opRes0, err0, remapTopVer0);
return false;
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
- return null;
- }
-
- /** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
assert res == null || res instanceof GridCacheReturn;
@@ -824,25 +843,33 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
CachePartialUpdateCheckedException err0 = null;
AffinityTopologyVersion remapTopVer0 = null;
+ List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
+
+ boolean rcvAll = false;
+
synchronized (mux) {
if (this.futId == null || !this.futId.equals(futId))
return false;
if (singleReq != null) {
- if (singleReq.checkDhtNodes(cctx)) {
+ DhtLeftResult res = singleReq.checkDhtNodes(cctx);
+
+ if (res == DhtLeftResult.DONE) {
opRes0 = opRes;
err0 = err;
remapTopVer0 = onAllReceived();
}
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
+ checkReqs = Collections.singletonList(new GridNearAtomicCheckUpdateRequest(singleReq.req));
else
return true;
}
else {
if (mappings != null) {
- boolean rcvAll = false;
-
for (PrimaryRequestState reqState : mappings.values()) {
- if (reqState.checkDhtNodes(cctx)) {
+ DhtLeftResult res = reqState.checkDhtNodes(cctx);
+
+ if (res == DhtLeftResult.DONE) {
assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
resCnt++;
@@ -857,19 +884,34 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
break;
}
}
- }
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) {
+ if (checkReqs == null)
+ checkReqs = new ArrayList<>(mappings.size());
- if (!rcvAll)
- return true;
+ checkReqs.add(new GridNearAtomicCheckUpdateRequest(reqState.req));
+ }
+ }
}
else
return true;
}
}
- finishUpdateFuture(opRes0, err0, remapTopVer0);
+ if (checkReqs != null) {
+ assert !rcvAll;
- return false;
+ for (int i = 0; i < checkReqs.size(); i++)
+ sendCheckUpdateRequest(checkReqs.get(i));
+
+ return false;
+ }
+ else if (rcvAll) {
+ finishUpdateFuture(opRes0, err0, remapTopVer0);
+
+ return false;
+ }
+
+ return true;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a334fd5..34b8540 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -783,7 +783,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
catch (IgniteFutureTimeoutCheckedException ignored) {
// Print pending transactions and locks that might have led to hang.
if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
- dumpPendingObjects();
+ synchronized (getClass()) {
+ dumpPendingObjects();
+ }
dumpedObjects++;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46c71ae5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 081e49f..222620a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -1750,7 +1750,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
for (int i = 0; i < SRV_CNT; i++)
startGrid(i);
- final int CLIENT_CNT = 4;
+ final int CLIENT_CNT = 1;
final List<Ignite> clients = new ArrayList<>();
@@ -1768,7 +1768,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
final AtomicInteger threadIdx = new AtomicInteger(0);
- final int THREADS = CLIENT_CNT * 3;
+ final int THREADS = CLIENT_CNT * 1;
final ConcurrentHashSet<Integer> putKeys = new ConcurrentHashSet<>();
@@ -1834,6 +1834,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
}
}
else
+ //cache.put(map.keySet().iterator().next(), map.values().iterator().next());
cache.putAll(map);
putKeys.addAll(map.keySet());
[2/3] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-2.0' into ignite-4705-2
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-2.0' into ignite-4705-2
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ff55b4d2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ff55b4d2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ff55b4d2
Branch: refs/heads/ignite-4705-2
Commit: ff55b4d265f9af2eb85f61f245b0c4c3e4598c18
Parents: 4ab159d 93e1996
Author: sboikov <sb...@gridgain.com>
Authored: Sat Mar 4 12:05:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Sat Mar 4 12:05:07 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/GridJobSiblingImpl.java | 4 +-
.../internal/managers/GridManagerAdapter.java | 2 +-
.../checkpoint/GridCheckpointManager.java | 2 +-
.../managers/communication/GridIoManager.java | 206 +++++++------------
.../deployment/GridDeploymentCommunication.java | 6 +-
.../eventstorage/GridEventStorageManager.java | 6 +-
.../processors/cache/GridCacheIoManager.java | 8 +-
.../cache/transactions/IgniteTxManager.java | 6 +-
.../clock/GridClockSyncProcessor.java | 2 +-
.../continuous/GridContinuousProcessor.java | 2 +-
.../datastreamer/DataStreamProcessor.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../internal/processors/igfs/IgfsContext.java | 8 +-
.../processors/job/GridJobProcessor.java | 4 +-
.../internal/processors/job/GridJobWorker.java | 2 +-
.../marshaller/ClientRequestFuture.java | 2 +-
.../GridMarshallerMappingProcessor.java | 2 +-
.../handlers/task/GridTaskCommandHandler.java | 4 +-
.../processors/task/GridTaskProcessor.java | 2 +-
.../processors/task/GridTaskWorker.java | 4 +-
.../GridCommunicationSendMessageSelfTest.java | 2 +-
.../communication/GridIoManagerSelfTest.java | 28 +--
.../nio/IgniteExceptionInNioWorkerSelfTest.java | 2 +-
.../communication/GridIoManagerBenchmark.java | 4 +-
.../communication/GridIoManagerBenchmark0.java | 14 +-
.../communication/GridCacheMessageSelfTest.java | 2 +-
.../hadoop/shuffle/HadoopShuffle.java | 3 +-
.../processors/query/h2/IgniteH2Indexing.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 6 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
30 files changed, 140 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff55b4d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff55b4d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff55b4d2/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------