You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/08/11 09:27:31 UTC

[22/50] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/25a109b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/25a109b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/25a109b3

Branch: refs/heads/ignite-426
Commit: 25a109b31e80807790d1b5e61fb3412f142d84dc
Parents: 6720885
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Mon Aug 3 21:31:35 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Mon Aug 3 21:31:35 2015 -0700

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java        | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25a109b3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 1e609e3..a129cbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1000,6 +1000,16 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         SequentialMessageSet msgSet = seqMsgs.get(msg.topic());
 
         if (msgSet == null) {
+            if (closedTopics.contains(msg.topic())) {
+                if (log.isDebugEnabled())
+                    log.debug("Ignoring message because the topic is already closed: " + msg);
+
+                if (msgC != null)
+                    msgC.run();
+
+                return;
+            }
+
             SequentialMessageSet old = seqMsgs.putIfAbsent(msg.topic(), msgSet = new SequentialMessageSet());
 
             if (old != null)
@@ -1793,6 +1803,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             if (map != null)
                 msgSets = map.values();
+
+            seqMsgs.remove(topic);
         }
         else {
             for (;;) {
@@ -1859,8 +1871,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (rmv && log.isDebugEnabled())
             log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']');
 
-        if (lsnr instanceof ArrayListener)
-        {
+        if (lsnr instanceof ArrayListener) {
             for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr)
                 closeListener(childLsnr);
         }