You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/08/11 09:27:33 UTC
[24/50] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC
updates
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2421fee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2421fee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2421fee9
Branch: refs/heads/ignite-426
Commit: 2421fee936c1cfefa072c00da8ebce6897c2156f
Parents: d26184a
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Tue Aug 4 10:28:24 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Tue Aug 4 10:28:24 2015 -0700
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 75 +++++++++++---------
1 file changed, 43 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2421fee9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a129cbe..b38106e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -980,10 +980,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param msgC Closure to call when message processing finished.
*/
private void processSequentialMessage(
- final UUID nodeId,
- final GridIoMessage msg,
+ UUID nodeId,
+ GridIoMessage msg,
byte plc,
- final IgniteRunnable msgC
+ IgniteRunnable msgC
) throws IgniteCheckedException {
final GridMessageListener lsnr = lsnrMap.get(msg.topic());
@@ -1018,39 +1018,41 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
msgSet.add(nodeId, msg, msgC);
- if (msgC == null) {
- assert locNodeId.equals(nodeId);
+ if (!msgSet.reserved()) {
+ if (msgC == null) {
+ assert locNodeId.equals(nodeId);
- msgSet.unwind(lsnr);
- }
- else {
- assert !locNodeId.equals(nodeId);
+ msgSet.unwind(lsnr);
+ }
+ else {
+ assert !locNodeId.equals(nodeId);
- final SequentialMessageSet msgSet0 = msgSet;
+ final SequentialMessageSet msgSet0 = msgSet;
- Runnable c = new Runnable() {
- @Override public void run() {
- try {
- threadProcessingMessage(true);
+ Runnable c = new Runnable() {
+ @Override public void run() {
+ try {
+ threadProcessingMessage(true);
- msgSet0.unwind(lsnr);
- }
- finally {
- threadProcessingMessage(false);
+ msgSet0.unwind(lsnr);
+ }
+ finally {
+ threadProcessingMessage(false);
+ }
}
- }
- };
+ };
- try {
- pool(plc).execute(c);
- }
- catch (RejectedExecutionException e) {
- U.error(log, "Failed to process sequential message due to execution rejection. " +
- "Increase the upper bound on executor service provided by corresponding " +
- "configuration property. Will attempt to process message in the listener " +
- "thread instead [msgPlc=" + plc + ']', e);
+ try {
+ pool(plc).execute(c);
+ }
+ catch (RejectedExecutionException e) {
+ U.error(log, "Failed to process sequential message due to execution rejection. " +
+ "Increase the upper bound on executor service provided by corresponding " +
+ "configuration property. Will attempt to process message in the listener " +
+ "thread instead [msgPlc=" + plc + ']', e);
- c.run();
+ c.run();
+ }
}
}
}
@@ -1108,10 +1110,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
ioMsg.topicBytes(marsh.marshal(topic));
try {
- if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
- ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure);
+ CommunicationSpi spi = getSpi();
+
+ if (spi instanceof TcpCommunicationSpi)
+ ((TcpCommunicationSpi)spi).sendMessage(node, ioMsg, ackClosure);
else
- getSpi().sendMessage(node, ioMsg);
+ spi.sendMessage(node, ioMsg);
}
catch (IgniteSpiException e) {
throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
@@ -2535,6 +2539,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private final AtomicBoolean reserve = new AtomicBoolean();
/**
+ * @return {@code True} if currently reserved.
+ */
+ boolean reserved() {
+ return reserve.get();
+ }
+
+ /**
* @param nodeId Node ID.
* @param msg Message.
* @param msgC Closure to call when message processing finished.