You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/08/11 09:27:33 UTC

[24/50] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2421fee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2421fee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2421fee9

Branch: refs/heads/ignite-426
Commit: 2421fee936c1cfefa072c00da8ebce6897c2156f
Parents: d26184a
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Tue Aug 4 10:28:24 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Tue Aug 4 10:28:24 2015 -0700

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 75 +++++++++++---------
 1 file changed, 43 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2421fee9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a129cbe..b38106e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -980,10 +980,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param msgC Closure to call when message processing finished.
      */
     private void processSequentialMessage(
-        final UUID nodeId,
-        final GridIoMessage msg,
+        UUID nodeId,
+        GridIoMessage msg,
         byte plc,
-        final IgniteRunnable msgC
+        IgniteRunnable msgC
     ) throws IgniteCheckedException {
         final GridMessageListener lsnr = lsnrMap.get(msg.topic());
 
@@ -1018,39 +1018,41 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         msgSet.add(nodeId, msg, msgC);
 
-        if (msgC == null) {
-            assert locNodeId.equals(nodeId);
+        if (!msgSet.reserved()) {
+            if (msgC == null) {
+                assert locNodeId.equals(nodeId);
 
-            msgSet.unwind(lsnr);
-        }
-        else {
-            assert !locNodeId.equals(nodeId);
+                msgSet.unwind(lsnr);
+            }
+            else {
+                assert !locNodeId.equals(nodeId);
 
-            final SequentialMessageSet msgSet0 = msgSet;
+                final SequentialMessageSet msgSet0 = msgSet;
 
-            Runnable c = new Runnable() {
-                @Override public void run() {
-                    try {
-                        threadProcessingMessage(true);
+                Runnable c = new Runnable() {
+                    @Override public void run() {
+                        try {
+                            threadProcessingMessage(true);
 
-                        msgSet0.unwind(lsnr);
-                    }
-                    finally {
-                        threadProcessingMessage(false);
+                            msgSet0.unwind(lsnr);
+                        }
+                        finally {
+                            threadProcessingMessage(false);
+                        }
                     }
-                }
-            };
+                };
 
-            try {
-                pool(plc).execute(c);
-            }
-            catch (RejectedExecutionException e) {
-                U.error(log, "Failed to process sequential message due to execution rejection. " +
-                    "Increase the upper bound on executor service provided by corresponding " +
-                    "configuration property. Will attempt to process message in the listener " +
-                    "thread instead [msgPlc=" + plc + ']', e);
+                try {
+                    pool(plc).execute(c);
+                }
+                catch (RejectedExecutionException e) {
+                    U.error(log, "Failed to process sequential message due to execution rejection. " +
+                        "Increase the upper bound on executor service provided by corresponding " +
+                        "configuration property. Will attempt to process message in the listener " +
+                        "thread instead [msgPlc=" + plc + ']', e);
 
-                c.run();
+                    c.run();
+                }
             }
         }
     }
@@ -1108,10 +1110,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 ioMsg.topicBytes(marsh.marshal(topic));
 
             try {
-                if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
-                    ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure);
+                CommunicationSpi spi = getSpi();
+
+                if (spi instanceof TcpCommunicationSpi)
+                    ((TcpCommunicationSpi)spi).sendMessage(node, ioMsg, ackClosure);
                 else
-                    getSpi().sendMessage(node, ioMsg);
+                    spi.sendMessage(node, ioMsg);
             }
             catch (IgniteSpiException e) {
                 throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
@@ -2535,6 +2539,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         private final AtomicBoolean reserve = new AtomicBoolean();
 
         /**
+         * @return {@code True} if currently reserved.
+         */
+        boolean reserved() {
+            return reserve.get();
+        }
+
+        /**
          * @param nodeId Node ID.
          * @param msg Message.
          * @param msgC Closure to call when message processing finished.