You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/06/26 15:44:52 UTC
incubator-ignite git commit: # on message processed notification
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-gg-10443 [created] 10655b2a0
# on message processed notification
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/10655b2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/10655b2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/10655b2a
Branch: refs/heads/ignite-gg-10443
Commit: 10655b2a0390317eb3a88cde6cd09575b87eae78
Parents: 01d842a
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Jun 26 16:44:24 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Jun 26 16:44:24 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 49 +++++++++++---------
.../util/nio/GridNioMessageTracker.java | 23 +++++++--
2 files changed, 48 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/10655b2a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 4382731..d8dcc2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -718,7 +719,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
map = msgSetMap.get(msg.topic());
if (map == null) {
- set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg);
+ set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg, msgC);
map = new ConcurrentHashMap0<>();
@@ -748,7 +749,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (set == null) {
GridCommunicationMessageSet old = map.putIfAbsent(nodeId,
set = new GridCommunicationMessageSet(plc, msg.topic(),
- nodeId, timeout, skipOnTimeout, msg));
+ nodeId, timeout, skipOnTimeout, msg, msgC));
assert old == null;
@@ -766,7 +767,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert set != null;
assert !isNew;
- set.add(msg);
+ set.add(msg, msgC);
break;
}
@@ -795,14 +796,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (isNew && set.endTime() != Long.MAX_VALUE)
ctx.timeout().addTimeoutObject(set);
- if (set.reserved()) {
- // Set is reserved which means that it is currently processed by worker thread.
- if (msgC != null)
- msgC.run();
-
- return;
- }
-
final GridMessageListener lsnr = lsnrMap.get(msg.topic());
if (lsnr == null) {
@@ -821,7 +814,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
"listener is registered): " + msg);
}
- // Mark the message as processed.
+ // Mark the message as processed, otherwise reading from the connection
+ // may stop.
if (msgC != null)
msgC.run();
@@ -848,8 +842,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
finally {
threadProcessingMessage(false);
-
- msgC.run();
}
}
};
@@ -1852,7 +1844,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** */
@GridToStringInclude
- private final Queue<IgniteBiTuple<GridIoMessage, Long>> msgs = new ConcurrentLinkedDeque<>();
+ private final Queue<GridTuple3<GridIoMessage, Long, IgniteRunnable>> msgs = new ConcurrentLinkedDeque<>();
/** */
private final AtomicBoolean reserved = new AtomicBoolean();
@@ -1873,6 +1865,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param timeout Timeout.
* @param skipOnTimeout Whether message can be skipped on timeout.
* @param msg Message to add immediately.
+ * @param msgC Message closure (may be {@code null}).
*/
GridCommunicationMessageSet(
GridIoPolicy plc,
@@ -1880,7 +1873,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
UUID nodeId,
long timeout,
boolean skipOnTimeout,
- GridIoMessage msg
+ GridIoMessage msg,
+ @Nullable IgniteRunnable msgC
) {
assert nodeId != null;
assert topic != null;
@@ -1899,7 +1893,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
lastTs = U.currentTimeMillis();
- msgs.add(F.t(msg, lastTs));
+ msgs.add(F.t(msg, lastTs, msgC));
}
/** {@inheritDoc} */
@@ -2017,15 +2011,28 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
void unwind(GridMessageListener lsnr) {
assert reserved.get();
- for (IgniteBiTuple<GridIoMessage, Long> t = msgs.poll(); t != null; t = msgs.poll())
- lsnr.onMessage(nodeId, t.get1().message());
+ for (GridTuple3<GridIoMessage, Long, IgniteRunnable> t = msgs.poll(); t != null; t = msgs.poll()) {
+ try {
+ lsnr.onMessage(
+ nodeId,
+ t.get1().message());
+ }
+ finally {
+ if (t.get3() != null)
+ t.get3().run();
+ }
+ }
}
/**
* @param msg Message to add.
+ * @param msgC Message closure (may be {@code null}).
*/
- void add(GridIoMessage msg) {
- msgs.add(F.t(msg, U.currentTimeMillis()));
+ void add(
+ GridIoMessage msg,
+ @Nullable IgniteRunnable msgC
+ ) {
+ msgs.add(F.t(msg, U.currentTimeMillis(), msgC));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/10655b2a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
index 52b7fed..c9ed1a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
@@ -56,9 +56,26 @@ public class GridNioMessageTracker implements IgniteRunnable {
/** {@inheritDoc} */
@Override public void run() {
- int cnt = msgCnt.decrementAndGet();
+ // In case of ordered messages this may be called twice for 1 message.
+ // Example: message arrives, but listener has not been installed yet.
+ // Message set is created, but message does not get actually processed.
+ // If this is not called, connection may be paused which causes hang.
+ // It seems acceptable to have the following logic accounting the aforementioned.
+ int cnt = 0;
- assert cnt >= 0 : "Invalid count: " + cnt;
+ for (;;) {
+ int cur = msgCnt.get();
+
+ if (cur == 0)
+ break;
+
+ cnt = cur - 1;
+
+ if (msgCnt.compareAndSet(cur, cnt))
+ break;
+ }
+
+ assert cnt >= 0 : "Invalid count [cnt=" + cnt + ", this=" + this + ']';
if (cnt < msgQueueLimit && paused && lock.tryLock()) {
try {
@@ -116,6 +133,6 @@ public class GridNioMessageTracker implements IgniteRunnable {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridNioMessageTracker.class, this, super.toString());
+ return S.toString(GridNioMessageTracker.class, this, "hash", System.identityHashCode(this));
}
}