You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/07/06 16:59:17 UTC

[10/50] incubator-ignite git commit: IGNITE-1068: Fixing ordered messages back pressure control.

IGNITE-1068: Fixing ordered messages back pressure control.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c7e74874
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c7e74874
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c7e74874

Branch: refs/heads/master
Commit: c7e7487458b72ba8b666fba7b9e24004d425719f
Parents: bccba30
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jul 1 11:50:08 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jul 1 11:50:08 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 49 +++++++++++---------
 .../util/nio/GridNioMessageTracker.java         | 23 +++++++--
 2 files changed, 48 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7e74874/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 4382731..d8dcc2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -718,7 +719,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             map = msgSetMap.get(msg.topic());
 
             if (map == null) {
-                set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg);
+                set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg, msgC);
 
                 map = new ConcurrentHashMap0<>();
 
@@ -748,7 +749,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     if (set == null) {
                         GridCommunicationMessageSet old = map.putIfAbsent(nodeId,
                             set = new GridCommunicationMessageSet(plc, msg.topic(),
-                                nodeId, timeout, skipOnTimeout, msg));
+                                nodeId, timeout, skipOnTimeout, msg, msgC));
 
                         assert old == null;
 
@@ -766,7 +767,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 assert set != null;
                 assert !isNew;
 
-                set.add(msg);
+                set.add(msg, msgC);
 
                 break;
             }
@@ -795,14 +796,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (isNew && set.endTime() != Long.MAX_VALUE)
             ctx.timeout().addTimeoutObject(set);
 
-        if (set.reserved()) {
-            // Set is reserved which means that it is currently processed by worker thread.
-            if (msgC != null)
-                msgC.run();
-
-            return;
-        }
-
         final GridMessageListener lsnr = lsnrMap.get(msg.topic());
 
         if (lsnr == null) {
@@ -821,7 +814,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     "listener is registered): " + msg);
             }
 
-            // Mark the message as processed.
+            // Mark the message as processed, otherwise reading from the connection
+            // may stop.
             if (msgC != null)
                 msgC.run();
 
@@ -848,8 +842,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 }
                 finally {
                     threadProcessingMessage(false);
-
-                    msgC.run();
                 }
             }
         };
@@ -1852,7 +1844,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         /** */
         @GridToStringInclude
-        private final Queue<IgniteBiTuple<GridIoMessage, Long>> msgs = new ConcurrentLinkedDeque<>();
+        private final Queue<GridTuple3<GridIoMessage, Long, IgniteRunnable>> msgs = new ConcurrentLinkedDeque<>();
 
         /** */
         private final AtomicBoolean reserved = new AtomicBoolean();
@@ -1873,6 +1865,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
          * @param timeout Timeout.
          * @param skipOnTimeout Whether message can be skipped on timeout.
          * @param msg Message to add immediately.
+         * @param msgC Message closure (may be {@code null}).
          */
         GridCommunicationMessageSet(
             GridIoPolicy plc,
@@ -1880,7 +1873,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             UUID nodeId,
             long timeout,
             boolean skipOnTimeout,
-            GridIoMessage msg
+            GridIoMessage msg,
+            @Nullable IgniteRunnable msgC
         ) {
             assert nodeId != null;
             assert topic != null;
@@ -1899,7 +1893,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             lastTs = U.currentTimeMillis();
 
-            msgs.add(F.t(msg, lastTs));
+            msgs.add(F.t(msg, lastTs, msgC));
         }
 
         /** {@inheritDoc} */
@@ -2017,15 +2011,28 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         void unwind(GridMessageListener lsnr) {
             assert reserved.get();
 
-            for (IgniteBiTuple<GridIoMessage, Long> t = msgs.poll(); t != null; t = msgs.poll())
-                lsnr.onMessage(nodeId, t.get1().message());
+            for (GridTuple3<GridIoMessage, Long, IgniteRunnable> t = msgs.poll(); t != null; t = msgs.poll()) {
+                try {
+                    lsnr.onMessage(
+                        nodeId,
+                        t.get1().message());
+                }
+                finally {
+                    if (t.get3() != null)
+                        t.get3().run();
+                }
+            }
         }
 
         /**
          * @param msg Message to add.
+         * @param msgC Message closure (may be {@code null}).
          */
-        void add(GridIoMessage msg) {
-            msgs.add(F.t(msg, U.currentTimeMillis()));
+        void add(
+            GridIoMessage msg,
+            @Nullable IgniteRunnable msgC
+        ) {
+            msgs.add(F.t(msg, U.currentTimeMillis(), msgC));
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7e74874/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
index 52b7fed..c9ed1a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
@@ -56,9 +56,26 @@ public class GridNioMessageTracker implements IgniteRunnable {
 
     /** {@inheritDoc} */
     @Override public void run() {
-        int cnt = msgCnt.decrementAndGet();
+        // In case of ordered messages this may be called twice for 1 message.
+        // Example: message arrives, but listener has not been installed yet.
+        // Message set is created, but message does not get actually processed.
+        // If this is not called, connection may be paused which causes hang.
+        // It seems acceptable to have the following logic accounting the aforementioned.
+        int cnt = 0;
 
-        assert cnt >= 0 : "Invalid count: " + cnt;
+        for (;;) {
+            int cur = msgCnt.get();
+
+            if (cur == 0)
+                break;
+
+            cnt = cur - 1;
+
+            if (msgCnt.compareAndSet(cur, cnt))
+                break;
+        }
+
+        assert cnt >= 0 : "Invalid count [cnt=" + cnt + ", this=" + this + ']';
 
         if (cnt < msgQueueLimit && paused && lock.tryLock()) {
             try {
@@ -116,6 +133,6 @@ public class GridNioMessageTracker implements IgniteRunnable {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridNioMessageTracker.class, this, super.toString());
+        return S.toString(GridNioMessageTracker.class, this, "hash", System.identityHashCode(this));
     }
 }