You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/08/11 09:27:31 UTC
[22/50] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC
updates
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/25a109b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/25a109b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/25a109b3
Branch: refs/heads/ignite-426
Commit: 25a109b31e80807790d1b5e61fb3412f142d84dc
Parents: 6720885
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Mon Aug 3 21:31:35 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Mon Aug 3 21:31:35 2015 -0700
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25a109b3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 1e609e3..a129cbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1000,6 +1000,16 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
SequentialMessageSet msgSet = seqMsgs.get(msg.topic());
if (msgSet == null) {
+ if (closedTopics.contains(msg.topic())) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring message because the topic is already closed: " + msg);
+
+ if (msgC != null)
+ msgC.run();
+
+ return;
+ }
+
SequentialMessageSet old = seqMsgs.putIfAbsent(msg.topic(), msgSet = new SequentialMessageSet());
if (old != null)
@@ -1793,6 +1803,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (map != null)
msgSets = map.values();
+
+ seqMsgs.remove(topic);
}
else {
for (;;) {
@@ -1859,8 +1871,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (rmv && log.isDebugEnabled())
log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']');
- if (lsnr instanceof ArrayListener)
- {
+ if (lsnr instanceof ArrayListener) {
for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr)
closeListener(childLsnr);
}