You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/11/14 22:41:09 UTC
svn commit: r1542092 - in /qpid/dispatch/trunk: CMakeLists.txt
include/qpid/dispatch/container.h src/agent.c src/conditionals.h.in
src/container.c src/router_node.c
Author: tross
Date: Thu Nov 14 21:41:08 2013
New Revision: 1542092
URL: http://svn.apache.org/r1542092
Log:
QPID-5343 - Fixed the drain protocol on senders.
- Added a check for absent correlation IDson agent requests.
Added:
qpid/dispatch/trunk/src/conditionals.h.in
Modified:
qpid/dispatch/trunk/CMakeLists.txt
qpid/dispatch/trunk/include/qpid/dispatch/container.h
qpid/dispatch/trunk/src/agent.c
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/router_node.c
Modified: qpid/dispatch/trunk/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/CMakeLists.txt?rev=1542092&r1=1542091&r2=1542092&view=diff
==============================================================================
--- qpid/dispatch/trunk/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/CMakeLists.txt Thu Nov 14 21:41:08 2013
@@ -30,6 +30,8 @@ include(FindPythonLibs)
enable_testing()
include (CTest)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/conditionals.h.in ${CMAKE_CURRENT_BINARY_DIR}/conditionals.h)
+
if (NOT PYTHONLIBS_FOUND)
message(FATAL_ERROR "Python Development Libraries are needed.")
endif (NOT PYTHONLIBS_FOUND)
@@ -73,6 +75,7 @@ find_path(proton_include proton/driver.h
include_directories(
${CMAKE_CURRENT_SOURCE_DIR}/include
${CMAKE_CURRENT_SOURCE_DIR}/src
+ ${CMAKE_CURRENT_BINARY_DIR}
${proton_include}
${PYTHON_INCLUDE_PATH}
)
@@ -80,6 +83,11 @@ include_directories(
set(CMAKE_C_FLAGS "-pthread -Wall -Werror -std=gnu99")
set(CATCH_UNDEFINED "-Wl,--no-undefined")
+set (CMAKE_REQUIRED_FLAGS "-lqpid-proton")
+set (CMAKE_REQUIRED_INCLUDES ${proton_include})
+set (CMAKE_REQUIRED_LIBRARIES ${proton_lib})
+check_function_exists(pn_link_get_drain HAVE_LINK_GET_DRAIN)
+
##
## Build the Multi-Threaded Server Library
##
Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1542092&r1=1542091&r2=1542092&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Thu Nov 14 21:41:08 2013
@@ -174,6 +174,7 @@ pn_terminus_t *dx_link_remote_source(dx_
pn_terminus_t *dx_link_remote_target(dx_link_t *link);
void dx_link_activate(dx_link_t *link);
void dx_link_close(dx_link_t *link);
+bool dx_link_drain_changed(dx_link_t *link, bool *mode);
/**
* Important: dx_delivery must never be called twice in a row without an intervening pn_link_advance.
Modified: qpid/dispatch/trunk/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/agent.c?rev=1542092&r1=1542091&r2=1542092&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/agent.c (original)
+++ qpid/dispatch/trunk/src/agent.c Thu Nov 14 21:41:08 2013
@@ -88,7 +88,8 @@ static dx_composed_field_t *dx_agent_set
dx_compose_insert_string_iterator(field, reply_to); // to
dx_compose_insert_null(field); // subject
dx_compose_insert_null(field); // reply-to
- dx_compose_insert_typed_iterator(field, cid); // correlation-id
+ if (cid)
+ dx_compose_insert_typed_iterator(field, cid); // correlation-id
dx_compose_end_list(field);
//
Added: qpid/dispatch/trunk/src/conditionals.h.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/conditionals.h.in?rev=1542092&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/conditionals.h.in (added)
+++ qpid/dispatch/trunk/src/conditionals.h.in Thu Nov 14 21:41:08 2013
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#cmakedefine HAVE_LINK_GET_DRAIN "${HAVE_LINK_GET_DRAIN}"
+
Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1542092&r1=1542091&r2=1542092&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Thu Nov 14 21:41:08 2013
@@ -31,6 +31,14 @@
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/log.h>
#include <qpid/dispatch/agent.h>
+#include "conditionals.h"
+
+#ifndef HAVE_LINK_GET_DRAIN
+static bool pn_link_get_drain(pn_link_t *link)
+{
+ return false;
+}
+#endif
static char *module="CONTAINER";
@@ -52,6 +60,7 @@ struct dx_link_t {
pn_link_t *pn_link;
void *context;
dx_node_t *node;
+ bool drain_mode;
};
ALLOC_DECLARE(dx_link_t);
@@ -130,9 +139,10 @@ static void setup_outgoing_link(dx_conta
return;
}
- link->pn_link = pn_link;
- link->context = 0;
- link->node = node;
+ link->pn_link = pn_link;
+ link->context = 0;
+ link->node = node;
+ link->drain_mode = pn_link_get_drain(pn_link);
pn_link_set_context(pn_link, link);
node->ntype->outgoing_handler(node->context, link);
@@ -171,9 +181,10 @@ static void setup_incoming_link(dx_conta
return;
}
- link->pn_link = pn_link;
- link->context = 0;
- link->node = node;
+ link->pn_link = pn_link;
+ link->context = 0;
+ link->node = node;
+ link->drain_mode = pn_link_get_drain(pn_link);
pn_link_set_context(pn_link, link);
node->ntype->incoming_handler(node->context, link);
@@ -628,8 +639,10 @@ dx_link_t *dx_link(dx_node_t *node, dx_c
link->pn_link = pn_sender(sess, name);
else
link->pn_link = pn_receiver(sess, name);
- link->context = node->context;
- link->node = node;
+
+ link->context = node->context;
+ link->node = node;
+ link->drain_mode = pn_link_get_drain(link->pn_link);
pn_link_set_context(link->pn_link, link);
@@ -759,6 +772,18 @@ void dx_link_close(dx_link_t *link)
}
+bool dx_link_drain_changed(dx_link_t *link, bool *mode)
+{
+ bool pn_mode = pn_link_get_drain(link->pn_link);
+ bool changed = pn_mode != link->drain_mode;
+
+ *mode = pn_mode;
+ if (changed)
+ link->drain_mode = pn_mode;
+ return changed;
+}
+
+
dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag)
{
pn_link_t *pnl = dx_link_pn(link);
Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1542092&r1=1542091&r2=1542092&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Thu Nov 14 21:41:08 2013
@@ -258,6 +258,8 @@ static int router_writable_link_handler(
dx_routed_event_t *re;
size_t offer;
int event_count = 0;
+ bool drain_mode;
+ bool drain_changed = dx_link_drain_changed(link, &drain_mode);
DEQ_INIT(to_send);
DEQ_INIT(events);
@@ -357,7 +359,20 @@ static int router_writable_link_handler(
//
// Set the offer to the number of messages remaining to be sent.
//
- pn_link_offered(pn_link, offer);
+ if (offer > 0)
+ pn_link_offered(pn_link, offer);
+ else {
+ pn_link_drained(pn_link);
+
+ //
+ // If this link is in drain mode and it wasn't last time we came through here, we need to
+ // count this operation as a work event. This will allow the container to process the
+ // connector and send out the flow(drain=true) response to the receiver.
+ //
+ if (drain_changed && drain_mode)
+ event_count++;
+ }
+
return event_count;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org