You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/11/14 22:41:09 UTC

svn commit: r1542092 - in /qpid/dispatch/trunk: CMakeLists.txt include/qpid/dispatch/container.h src/agent.c src/conditionals.h.in src/container.c src/router_node.c

Author: tross
Date: Thu Nov 14 21:41:08 2013
New Revision: 1542092

URL: http://svn.apache.org/r1542092
Log:
QPID-5343 - Fixed the drain protocol on senders.
          - Added a check for absent correlation IDson agent requests.

Added:
    qpid/dispatch/trunk/src/conditionals.h.in
Modified:
    qpid/dispatch/trunk/CMakeLists.txt
    qpid/dispatch/trunk/include/qpid/dispatch/container.h
    qpid/dispatch/trunk/src/agent.c
    qpid/dispatch/trunk/src/container.c
    qpid/dispatch/trunk/src/router_node.c

Modified: qpid/dispatch/trunk/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/CMakeLists.txt?rev=1542092&r1=1542091&r2=1542092&view=diff
==============================================================================
--- qpid/dispatch/trunk/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/CMakeLists.txt Thu Nov 14 21:41:08 2013
@@ -30,6 +30,8 @@ include(FindPythonLibs)
 enable_testing()
 include (CTest)
 
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/conditionals.h.in ${CMAKE_CURRENT_BINARY_DIR}/conditionals.h)
+
 if (NOT PYTHONLIBS_FOUND)
      message(FATAL_ERROR "Python Development Libraries are needed.")
 endif (NOT PYTHONLIBS_FOUND)
@@ -73,6 +75,7 @@ find_path(proton_include proton/driver.h
 include_directories(
     ${CMAKE_CURRENT_SOURCE_DIR}/include
     ${CMAKE_CURRENT_SOURCE_DIR}/src
+    ${CMAKE_CURRENT_BINARY_DIR}
     ${proton_include}
     ${PYTHON_INCLUDE_PATH}
     )
@@ -80,6 +83,11 @@ include_directories(
 set(CMAKE_C_FLAGS "-pthread -Wall -Werror -std=gnu99")
 set(CATCH_UNDEFINED "-Wl,--no-undefined")
 
+set (CMAKE_REQUIRED_FLAGS "-lqpid-proton")
+set (CMAKE_REQUIRED_INCLUDES ${proton_include})
+set (CMAKE_REQUIRED_LIBRARIES ${proton_lib})
+check_function_exists(pn_link_get_drain HAVE_LINK_GET_DRAIN)
+
 ##
 ## Build the Multi-Threaded Server Library
 ##

Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1542092&r1=1542091&r2=1542092&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Thu Nov 14 21:41:08 2013
@@ -174,6 +174,7 @@ pn_terminus_t *dx_link_remote_source(dx_
 pn_terminus_t *dx_link_remote_target(dx_link_t *link);
 void dx_link_activate(dx_link_t *link);
 void dx_link_close(dx_link_t *link);
+bool dx_link_drain_changed(dx_link_t *link, bool *mode);
 
 /**
  * Important: dx_delivery must never be called twice in a row without an intervening pn_link_advance.

Modified: qpid/dispatch/trunk/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/agent.c?rev=1542092&r1=1542091&r2=1542092&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/agent.c (original)
+++ qpid/dispatch/trunk/src/agent.c Thu Nov 14 21:41:08 2013
@@ -88,7 +88,8 @@ static dx_composed_field_t *dx_agent_set
     dx_compose_insert_string_iterator(field, reply_to);  // to
     dx_compose_insert_null(field);                       // subject
     dx_compose_insert_null(field);                       // reply-to
-    dx_compose_insert_typed_iterator(field, cid);        // correlation-id
+    if (cid)
+        dx_compose_insert_typed_iterator(field, cid);    // correlation-id
     dx_compose_end_list(field);
 
     //

Added: qpid/dispatch/trunk/src/conditionals.h.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/conditionals.h.in?rev=1542092&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/conditionals.h.in (added)
+++ qpid/dispatch/trunk/src/conditionals.h.in Thu Nov 14 21:41:08 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#cmakedefine HAVE_LINK_GET_DRAIN "${HAVE_LINK_GET_DRAIN}"
+

Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1542092&r1=1542091&r2=1542092&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Thu Nov 14 21:41:08 2013
@@ -31,6 +31,14 @@
 #include <qpid/dispatch/iterator.h>
 #include <qpid/dispatch/log.h>
 #include <qpid/dispatch/agent.h>
+#include "conditionals.h"
+
+#ifndef HAVE_LINK_GET_DRAIN
+static bool pn_link_get_drain(pn_link_t *link)
+{
+    return false;
+}
+#endif
 
 static char *module="CONTAINER";
 
@@ -52,6 +60,7 @@ struct dx_link_t {
     pn_link_t *pn_link;
     void      *context;
     dx_node_t *node;
+    bool       drain_mode;
 };
 
 ALLOC_DECLARE(dx_link_t);
@@ -130,9 +139,10 @@ static void setup_outgoing_link(dx_conta
         return;
     }
 
-    link->pn_link = pn_link;
-    link->context = 0;
-    link->node    = node;
+    link->pn_link    = pn_link;
+    link->context    = 0;
+    link->node       = node;
+    link->drain_mode = pn_link_get_drain(pn_link);
 
     pn_link_set_context(pn_link, link);
     node->ntype->outgoing_handler(node->context, link);
@@ -171,9 +181,10 @@ static void setup_incoming_link(dx_conta
         return;
     }
 
-    link->pn_link = pn_link;
-    link->context = 0;
-    link->node    = node;
+    link->pn_link    = pn_link;
+    link->context    = 0;
+    link->node       = node;
+    link->drain_mode = pn_link_get_drain(pn_link);
 
     pn_link_set_context(pn_link, link);
     node->ntype->incoming_handler(node->context, link);
@@ -628,8 +639,10 @@ dx_link_t *dx_link(dx_node_t *node, dx_c
         link->pn_link = pn_sender(sess, name);
     else
         link->pn_link = pn_receiver(sess, name);
-    link->context = node->context;
-    link->node    = node;
+
+    link->context    = node->context;
+    link->node       = node;
+    link->drain_mode = pn_link_get_drain(link->pn_link);
 
     pn_link_set_context(link->pn_link, link);
 
@@ -759,6 +772,18 @@ void dx_link_close(dx_link_t *link)
 }
 
 
+bool dx_link_drain_changed(dx_link_t *link, bool *mode)
+{
+    bool pn_mode = pn_link_get_drain(link->pn_link);
+    bool changed = pn_mode != link->drain_mode;
+
+    *mode = pn_mode;
+    if (changed)
+        link->drain_mode = pn_mode;
+    return changed;
+}
+
+
 dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag)
 {
     pn_link_t *pnl = dx_link_pn(link);

Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1542092&r1=1542091&r2=1542092&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Thu Nov 14 21:41:08 2013
@@ -258,6 +258,8 @@ static int router_writable_link_handler(
     dx_routed_event_t      *re;
     size_t                  offer;
     int                     event_count = 0;
+    bool                    drain_mode;
+    bool                    drain_changed = dx_link_drain_changed(link, &drain_mode);
 
     DEQ_INIT(to_send);
     DEQ_INIT(events);
@@ -357,7 +359,20 @@ static int router_writable_link_handler(
     //
     // Set the offer to the number of messages remaining to be sent.
     //
-    pn_link_offered(pn_link, offer);
+    if (offer > 0)
+        pn_link_offered(pn_link, offer);
+    else {
+        pn_link_drained(pn_link);
+
+        //
+        // If this link is in drain mode and it wasn't last time we came through here, we need to
+        // count this operation as a work event.  This will allow the container to process the
+        // connector and send out the flow(drain=true) response to the receiver.
+        //
+        if (drain_changed && drain_mode)
+            event_count++;
+    }
+
     return event_count;
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org