You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by GitBox <gi...@apache.org> on 2020/03/09 17:48:01 UTC

[GitHub] [qpid-dispatch] kgiusti commented on a change in pull request #693: DISPATCH-1579: open dedicated inter-router sessions for data and control links

kgiusti commented on a change in pull request #693: DISPATCH-1579: open dedicated inter-router sessions for data and control links
URL: https://github.com/apache/qpid-dispatch/pull/693#discussion_r389857365
 
 

 ##########
 File path: src/router_node.c
 ##########
 @@ -786,18 +786,47 @@ static int AMQP_link_attach_handler(void* context, qd_link_t *link)
 
 
 /**
- * Handler for flow events on links
+ * Handler for flow events on links.  Flow updates include session window
+ * state, which needs to be checked for unblocking Q3.
  */
 static int AMQP_link_flow_handler(void* context, qd_link_t *link)
 {
-    qd_router_t *router = (qd_router_t*) context;
-    qdr_link_t  *rlink  = (qdr_link_t*) qd_link_get_context(link);
-    pn_link_t   *pnlink = qd_link_pn(link);
-
-    if (!rlink)
-        return 0;
+    qd_router_t *router  = (qd_router_t*) context;
+    pn_link_t   *pnlink  = qd_link_pn(link);
+    pn_session_t *pn_ssn = pn_link_session(pnlink);
+
+    bool link_flowed = false;
+    if (pn_ssn) {
+        qd_session_t *qd_ssn = pn_session_get_context(pn_ssn);
+        if (qd_ssn && qd_session_is_q3_blocked(qd_ssn)) {
+            // Q3 blocked - have we drained enough outgoing bytes?
+            const size_t q3_lower = BUFFER_SIZE * QD_QLIMIT_Q3_LOWER;
+            if (pn_session_outgoing_bytes(pn_ssn) < q3_lower) {
+                // yes.  We must now unblock all links that have been blocked by Q3
+                qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn);
+                qd_link_t *blink = DEQ_HEAD(*blinks);
+                while (blink) {
+                    qd_link_q3_unblock(blink);  // removes from blinks list!
+                    qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(blink);
+                    if (qdr_link) {
+                        pn_link_t *pn_link = qd_link_pn(blink);
+                        // signalling flow to the core causes the link to be activated
+                        qdr_link_flow(router->router_core, qdr_link, pn_link_remote_credit(pn_link), pn_link_get_drain(pn_link));
+                        if (blink == link)
+                            link_flowed = true;  // original link flowed - do not flow again
+                    }
+                    blink = DEQ_HEAD(*blinks);
+                }
+            }
+        }
+    }
 
-    qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink));
+    if (!link_flowed) {
 
 Review comment:
   will do

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org