You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/04/17 16:45:13 UTC
[qpid-dispatch] branch master updated: DISPATCH-1311: move
qdr_delivery_t API to its own source files
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 771c9c9 DISPATCH-1311: move qdr_delivery_t API to its own source files
771c9c9 is described below
commit 771c9c9641a6e3b8180c2628d1192dca1ba92339
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Tue Apr 2 15:39:58 2019 -0400
DISPATCH-1311: move qdr_delivery_t API to its own source files
This patch merely moves code and does some renaming of functions. No
changes to actual functionality have occurred - it is simply a
reorganization of code layout.
This closes #489
---
include/qpid/dispatch/router_core.h | 30 --
src/CMakeLists.txt | 1 +
src/router_core/connections.c | 9 +-
src/router_core/core_client_api.c | 1 +
src/router_core/core_link_endpoint.c | 1 +
src/router_core/delivery.c | 830 ++++++++++++++++++++++++++++++++++
src/router_core/delivery.h | 147 ++++++
src/router_core/exchange_bindings.c | 1 +
src/router_core/forwarder.c | 1 +
src/router_core/router_core.c | 2 +-
src/router_core/router_core_private.h | 78 ----
src/router_core/transfer.c | 805 +--------------------------------
src/router_node.c | 1 +
13 files changed, 994 insertions(+), 913 deletions(-)
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 1e48c20..e36cc38 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -660,8 +660,6 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
const uint8_t *tag, int tag_length,
uint64_t disposition, pn_data_t* disposition_state);
-qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery);
-
int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode);
@@ -697,34 +695,6 @@ void qdr_connection_handlers(qdr_core_t *core,
/**
******************************************************************************
- * Delivery functions
- ******************************************************************************
- */
-void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp,
- bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given);
-
-void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
-void *qdr_delivery_get_context(qdr_delivery_t *delivery);
-qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery);
-void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label);
-void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label);
-void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length);
-qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
-qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery);
-bool qdr_delivery_presettled(const qdr_delivery_t *delivery);
-void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition);
-bool qdr_delivery_send_complete(const qdr_delivery_t *delivery);
-bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery);
-void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent);
-bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery);
-void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition);
-uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
-void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted);
-bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery);
-void qdr_delivery_add_num_closed_receivers(qdr_delivery_t *delivery);
-
-/**
- ******************************************************************************
* Management functions
******************************************************************************
*/
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 07ae846..67983fb 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -77,6 +77,7 @@ set(qpid_dispatch_SOURCES
router_core/core_events.c
router_core/core_link_endpoint.c
router_core/core_client_api.c
+ router_core/delivery.c
router_core/error.c
router_core/exchange_bindings.c
router_core/forwarder.c
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4956c7e..1fcff7c 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -26,6 +26,7 @@
#include <inttypes.h>
#include "router_core_private.h"
#include "core_link_endpoint.h"
+#include "delivery.h"
static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
@@ -705,7 +706,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
//
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
- qdr_increment_delivery_counters_CT(core, ref->dlv);
+ qdr_delivery_increment_counters_CT(core, ref->dlv);
qd_nullify_safe_ptr(&ref->dlv->link_sp);
//
// Now our reference
@@ -746,7 +747,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
//
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
- qdr_increment_delivery_counters_CT(core, dlv);
+ qdr_delivery_increment_counters_CT(core, dlv);
qd_nullify_safe_ptr(&dlv->link_sp);
//
@@ -790,7 +791,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
//
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
- qdr_increment_delivery_counters_CT(core, dlv);
+ qdr_delivery_increment_counters_CT(core, dlv);
qd_nullify_safe_ptr(&dlv->link_sp);
//
@@ -822,7 +823,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
//
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
- qdr_increment_delivery_counters_CT(core, dlv);
+ qdr_delivery_increment_counters_CT(core, dlv);
qd_nullify_safe_ptr(&dlv->link_sp);
// This decref is for the removing the delivery from the settled list
diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c
index 5810207..4b18591 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -19,6 +19,7 @@
#include "core_client_api.h"
#include "core_link_endpoint.h"
+#include "delivery.h"
#include <inttypes.h>
#include <errno.h>
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index 92260b3..351ee28 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -19,6 +19,7 @@
#include "core_link_endpoint.h"
#include "qpid/dispatch/alloc.h"
+#include "delivery.h"
#include <stdio.h>
struct qdrc_endpoint_t {
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
new file mode 100644
index 0000000..afcefa6
--- /dev/null
+++ b/src/router_core/delivery.c
@@ -0,0 +1,830 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "delivery.h"
+#include <inttypes.h>
+
+ALLOC_DEFINE(qdr_delivery_t);
+
+
+static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv);
+static pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *delivery);
+static void qdr_delivery_free_extension_state(qdr_delivery_t *delivery);
+static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+
+
+void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
+{
+ delivery->context = context;
+}
+
+
+void *qdr_delivery_get_context(const qdr_delivery_t *delivery)
+{
+ return delivery->context;
+}
+
+qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery)
+{
+ return delivery ? safe_deref_qdr_link_t(delivery->link_sp) : 0;
+}
+
+
+bool qdr_delivery_send_complete(const qdr_delivery_t *delivery)
+{
+ if (!delivery)
+ return false;
+ return qd_message_send_complete(delivery->msg);
+}
+
+
+bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery)
+{
+ if (!delivery)
+ return false;
+ return qd_message_tag_sent(delivery->msg);
+}
+
+
+void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent)
+{
+ if (!delivery)
+ return;
+
+ qd_message_set_tag_sent(delivery->msg, tag_sent);
+}
+
+
+bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery)
+{
+ if (!delivery)
+ return false;
+ return qd_message_receive_complete(delivery->msg);
+}
+
+
+void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition)
+{
+ if (delivery)
+ delivery->disposition = disposition;
+}
+
+
+uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery)
+{
+ if (!delivery)
+ return 0;
+ return delivery->disposition;
+}
+
+
+void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label)
+{
+ uint32_t rc = sys_atomic_inc(&delivery->ref_count);
+ assert(rc > 0 || !delivery->ref_counted);
+ delivery->ref_counted = true;
+ qdr_link_t *link = qdr_delivery_link(delivery);
+ if (link)
+ qd_log(link->core->log, QD_LOG_DEBUG,
+ "Delivery incref: dlv:%lx rc:%"PRIu32" %s", (long) delivery, rc + 1, label);
+}
+
+
+void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted)
+{
+ assert(delivery);
+ qd_message_set_aborted(delivery->msg, aborted);
+}
+
+bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery)
+{
+ if (!delivery)
+ return false;
+ return qd_message_aborted(delivery->msg);
+}
+
+
+void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label)
+{
+ uint32_t ref_count = sys_atomic_dec(&delivery->ref_count);
+ assert(ref_count > 0);
+ qd_log(core->log, QD_LOG_DEBUG, "Delivery decref: dlv:%lx rc:%"PRIu32" %s", (long) delivery, ref_count - 1, label);
+
+ if (ref_count == 1) {
+ //
+ // The delivery deletion must occur inside the core thread.
+ // Queue up an action to do the work.
+ //
+ qdr_action_t *action = qdr_action(qdr_delete_delivery_CT, "delete_delivery");
+ action->args.delivery.delivery = delivery;
+ action->label = label;
+ qdr_action_enqueue(core, action);
+ }
+}
+
+
+void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length)
+{
+ *tag = (const char*) delivery->tag;
+ *length = delivery->tag_length;
+}
+
+
+qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
+{
+ if (!delivery)
+ return 0;
+ return delivery->msg;
+}
+
+
+qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery)
+{
+ return delivery->error;
+}
+
+
+bool qdr_delivery_presettled(const qdr_delivery_t *delivery)
+{
+ return delivery->presettled;
+}
+
+
+void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition,
+ bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given)
+{
+ qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery");
+ action->args.delivery.delivery = delivery;
+ action->args.delivery.disposition = disposition;
+ action->args.delivery.settled = settled;
+ action->args.delivery.error = error;
+
+ // handle delivery-state extensions e.g. declared, transactional-state
+ qdr_delivery_read_extension_state(delivery, disposition, ext_state, false);
+
+ //
+ // The delivery's ref_count must be incremented to protect its travels into the
+ // core thread. If the caller has given its reference to us, we can simply use
+ // the given ref rather than increment a new one.
+ //
+ if (!ref_given)
+ qdr_delivery_incref(delivery, "qdr_delivery_update_disposition - add to action list");
+
+ qdr_action_enqueue(core, action);
+}
+
+
+qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv)
+{
+ qdr_action_t *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue");
+ action->args.connection.delivery = in_dlv;
+
+ qd_message_t *msg = qdr_delivery_message(in_dlv);
+ action->args.connection.more = !qd_message_receive_complete(msg);
+
+ // This incref is for the action reference
+ qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list");
+ qdr_action_enqueue(core, action);
+ return in_dlv;
+}
+
+
+void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+ bool push = false;
+ bool moved = false;
+
+ if (dlv->presettled) {
+ //
+ // The delivery is presettled. We simply want to call CORE_delivery_update which in turn will
+ // restart stalled links if the q2_holdoff has been hit.
+ // For single frame presettled deliveries, calling CORE_delivery_update does not do anything.
+ //
+ push = true;
+ }
+ else {
+ push = dlv->disposition != PN_RELEASED;
+ dlv->disposition = PN_RELEASED;
+ dlv->settled = true;
+ moved = qdr_delivery_settled_CT(core, dlv);
+
+ }
+
+ if (push || moved)
+ qdr_delivery_push_CT(core, dlv);
+
+ //
+ // Remove the unsettled reference
+ //
+ if (moved)
+ qdr_delivery_decref_CT(core, dlv, "qdr_delivery_release_CT - remove from unsettled list");
+}
+
+
+void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+ bool push = dlv->disposition != PN_MODIFIED;
+
+ dlv->disposition = PN_MODIFIED;
+ dlv->settled = true;
+ bool moved = qdr_delivery_settled_CT(core, dlv);
+
+ if (push || moved)
+ qdr_delivery_push_CT(core, dlv);
+
+ //
+ // Remove the unsettled reference
+ //
+ if (moved)
+ qdr_delivery_decref_CT(core, dlv, "qdr_delivery_failed_CT - remove from unsettled list");
+}
+
+
+bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+ //
+ // Remove a delivery from its unsettled list. Side effects include issuing
+ // replacement credit and visiting the link-quiescence algorithm
+ //
+ qdr_link_t *link = qdr_delivery_link(dlv);
+ qdr_connection_t *conn = link ? link->conn : 0;
+ bool moved = false;
+
+ if (!link || !conn)
+ return false;
+
+ //
+ // The lock needs to be acquired only for outgoing links
+ //
+ if (link->link_direction == QD_OUTGOING)
+ sys_mutex_lock(conn->work_lock);
+
+ if (dlv->where == QDR_DELIVERY_IN_UNSETTLED) {
+ DEQ_REMOVE(link->unsettled, dlv);
+ dlv->where = QDR_DELIVERY_NOWHERE;
+ moved = true;
+ }
+
+ if (link->link_direction == QD_OUTGOING)
+ sys_mutex_unlock(conn->work_lock);
+
+ if (dlv->tracking_addr) {
+ dlv->tracking_addr->outstanding_deliveries[dlv->tracking_addr_bit]--;
+ dlv->tracking_addr->tracked_deliveries--;
+
+ if (dlv->tracking_addr->tracked_deliveries == 0)
+ qdr_check_addr_CT(core, dlv->tracking_addr);
+
+ dlv->tracking_addr = 0;
+ }
+
+ //
+ // If this is an incoming link and it is not link-routed or inter-router, issue
+ // one replacement credit on the link. Note that credit on inter-router links is
+ // issued immediately even for unsettled deliveries.
+ //
+ if (moved && link->link_direction == QD_INCOMING &&
+ link->link_type != QD_LINK_ROUTER && !link->connected_link)
+ qdr_link_issue_credit_CT(core, link, 1, false);
+
+ return moved;
+}
+
+void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery)
+{
+ qdr_link_t *link = qdr_delivery_link(delivery);
+ if (link) {
+ bool do_rate = false;
+
+ if (delivery->presettled) {
+ do_rate = delivery->disposition != PN_RELEASED;
+ link->presettled_deliveries++;
+ if (link->link_direction == QD_INCOMING && link->link_type == QD_LINK_ENDPOINT)
+ core->presettled_deliveries++;
+ }
+ else if (delivery->disposition == PN_ACCEPTED) {
+ do_rate = true;
+ link->accepted_deliveries++;
+ if (link->link_direction == QD_INCOMING)
+ core->accepted_deliveries++;
+ }
+ else if (delivery->disposition == PN_REJECTED) {
+ do_rate = true;
+ link->rejected_deliveries++;
+ if (link->link_direction == QD_INCOMING)
+ core->rejected_deliveries++;
+ }
+ else if (delivery->disposition == PN_RELEASED && !delivery->presettled) {
+ link->released_deliveries++;
+ if (link->link_direction == QD_INCOMING)
+ core->released_deliveries++;
+ }
+ else if (delivery->disposition == PN_MODIFIED) {
+ link->modified_deliveries++;
+ if (link->link_direction == QD_INCOMING)
+ core->modified_deliveries++;
+ }
+
+ uint32_t delay = core->uptime_ticks - delivery->ingress_time;
+ if (delay > 10) {
+ link->deliveries_delayed_10sec++;
+ if (link->link_direction == QD_INCOMING)
+ core->deliveries_delayed_10sec++;
+ } else if (delay > 1) {
+ link->deliveries_delayed_1sec++;
+ if (link->link_direction == QD_INCOMING)
+ core->deliveries_delayed_1sec++;
+ }
+
+ if (qd_bitmask_valid_bit_value(delivery->ingress_index) && link->ingress_histogram)
+ link->ingress_histogram[delivery->ingress_index]++;
+
+ //
+ // Compute the settlement rate
+ //
+ if (do_rate) {
+ uint32_t delta_time = core->uptime_ticks - link->core_ticks;
+ if (delta_time > 0) {
+ if (delta_time > QDR_LINK_RATE_DEPTH)
+ delta_time = QDR_LINK_RATE_DEPTH;
+ for (uint8_t delta_slots = 0; delta_slots < delta_time; delta_slots++) {
+ link->rate_cursor = (link->rate_cursor + 1) % QDR_LINK_RATE_DEPTH;
+ link->settled_deliveries[link->rate_cursor] = 0;
+ }
+ link->core_ticks = core->uptime_ticks;
+ }
+ link->settled_deliveries[link->rate_cursor]++;
+ }
+ }
+}
+
+
+static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery)
+{
+ assert(sys_atomic_get(&delivery->ref_count) == 0);
+
+ if (delivery->msg || delivery->to_addr) {
+ qdr_delivery_cleanup_t *cleanup = new_qdr_delivery_cleanup_t();
+
+ DEQ_ITEM_INIT(cleanup);
+ cleanup->msg = delivery->msg;
+ cleanup->iter = delivery->to_addr;
+
+ DEQ_INSERT_TAIL(core->delivery_cleanup_list, cleanup);
+ }
+
+ if (delivery->tracking_addr) {
+ delivery->tracking_addr->outstanding_deliveries[delivery->tracking_addr_bit]--;
+ delivery->tracking_addr->tracked_deliveries--;
+
+ if (delivery->tracking_addr->tracked_deliveries == 0)
+ qdr_check_addr_CT(core, delivery->tracking_addr);
+
+ delivery->tracking_addr = 0;
+ }
+
+ qdr_delivery_increment_counters_CT(core, delivery);
+
+ //
+ // Free all the peer qdr_delivery_ref_t references
+ //
+ qdr_delivery_ref_t *ref = DEQ_HEAD(delivery->peers);
+ while (ref) {
+ qdr_del_delivery_ref(&delivery->peers, ref);
+ ref = DEQ_HEAD(delivery->peers);
+ }
+
+ qd_bitmask_free(delivery->link_exclusion);
+ qdr_error_free(delivery->error);
+
+ free_qdr_delivery_t(delivery);
+
+}
+
+static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv)
+{
+ return dlv->peer || DEQ_SIZE(dlv->peers) > 0;
+}
+
+void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv)
+{
+ // If there is no delivery or a peer, we cannot link each other.
+ if (!in_dlv || !out_dlv)
+ return;
+
+ if (!qdr_delivery_has_peer_CT(in_dlv)) {
+ // This is the very first peer. Link them up.
+ assert(!out_dlv->peer);
+ in_dlv->peer = out_dlv;
+ }
+ else {
+ if (in_dlv->peer) {
+ // This is the first time we know that in_dlv is going to have more than one peer.
+ // There is already a peer in the in_dlv->peer pointer, move it into a list and zero it out.
+ qdr_add_delivery_ref_CT(&in_dlv->peers, in_dlv->peer);
+
+ // Zero out the peer pointer. Since there is more than one peer, this peer has been moved to the "peers" linked list.
+ // All peers will now reside in the peers linked list. No need to decref/incref here because you are transferring ownership.
+ in_dlv->peer = 0;
+ }
+
+ qdr_add_delivery_ref_CT(&in_dlv->peers, out_dlv);
+ }
+
+ out_dlv->peer = in_dlv;
+
+ qdr_delivery_incref(out_dlv, "qdr_delivery_link_peers_CT - linked to peer (out delivery)");
+ qdr_delivery_incref(in_dlv, "qdr_delivery_link_peers_CT - linked to peer (in delivery)");
+}
+
+
+void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer)
+{
+ // If there is no delivery or a peer, we cannot proceed.
+ if (!dlv || !peer)
+ return;
+
+ // first, drop dlv's reference to its peer
+ //
+ if (dlv->peer) {
+ //
+ // This is the easy case. One delivery has only one peer. we can simply
+ // zero them out and directly decref.
+ //
+ assert(dlv->peer == peer);
+ dlv->peer = 0;
+ } else {
+ //
+ // This is the not so easy case
+ //
+ // dlv has more than one peer, so we have to search for our target peer
+ // in the list of peers
+ //
+ qdr_delivery_ref_t *peer_ref = DEQ_HEAD(dlv->peers);
+ while (peer_ref && peer_ref->dlv != peer) {
+ peer_ref = DEQ_NEXT(peer_ref);
+ }
+ assert(peer_ref != 0);
+ qdr_del_delivery_ref(&dlv->peers, peer_ref);
+ }
+
+ // now drop the peer's reference to dlv
+ //
+ if (peer->peer) {
+ assert(peer->peer == dlv);
+ peer->peer = 0;
+ } else {
+ qdr_delivery_ref_t *peer_ref = DEQ_HEAD(peer->peers);
+ while (peer_ref && peer_ref->dlv != dlv) {
+ peer_ref = DEQ_NEXT(peer_ref);
+ }
+ assert(peer_ref != 0);
+ qdr_del_delivery_ref(&peer->peers, peer_ref);
+ }
+
+ qdr_delivery_decref_CT(core, dlv, "qdr_delivery_unlink_peers_CT - unlinked from peer (delivery)");
+ qdr_delivery_decref_CT(core, peer, "qdr_delivery_unlink_peers_CT - unlinked from delivery (peer)");
+}
+
+
+qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv)
+{
+ // What if there are no peers for this delivery?
+ if (!qdr_delivery_has_peer_CT(dlv))
+ return 0;
+
+ if (dlv->peer) {
+ // If there is a dlv->peer, it is the one and only peer.
+ return dlv->peer;
+ }
+ else {
+ // The delivery has more than one peer.
+ qdr_delivery_ref_t *peer_ref = DEQ_HEAD(dlv->peers);
+
+ // Save the next peer to dlv->next_peer_ref so we can use it when somebody calls qdr_delivery_next_peer_CT
+ dlv->next_peer_ref = DEQ_NEXT(peer_ref);
+
+ // Return the first peer.
+ return peer_ref->dlv;
+ }
+}
+
+qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv)
+{
+ if (dlv->peer) {
+ // There is no next_peer if there is only one peer. If there is a non-zero dlv->peer, it is the only peer
+ return 0;
+ }
+ else {
+ // There is more than one peer to this delivery.
+ qdr_delivery_ref_t *next_peer_ref = dlv->next_peer_ref;
+ if (next_peer_ref) {
+ // Save the next peer to dlv->next_peer_ref so we can use it when somebody calls qdr_delivery_next_peer_CT
+ dlv->next_peer_ref = DEQ_NEXT(dlv->next_peer_ref);
+ return next_peer_ref->dlv;
+ }
+ return 0;
+ }
+}
+
+
+void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv, const char *label)
+{
+ uint32_t ref_count = sys_atomic_dec(&dlv->ref_count);
+ qd_log(core->log, QD_LOG_DEBUG, "Delivery decref_CT: dlv:%lx rc:%"PRIu32" %s", (long) dlv, ref_count - 1, label);
+ assert(ref_count > 0);
+
+ if (ref_count == 1)
+ qdr_delete_delivery_internal_CT(core, dlv);
+}
+
+
+static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ qdr_delivery_t *dlv = action->args.delivery.delivery;
+ qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv);
+ bool push = false;
+ bool peer_moved = false;
+ bool dlv_moved = false;
+ uint64_t disp = action->args.delivery.disposition;
+ bool settled = action->args.delivery.settled;
+ qdr_error_t *error = action->args.delivery.error;
+ bool error_unassigned = true;
+
+ qdr_link_t *dlv_link = qdr_delivery_link(dlv);
+ qdr_link_t *peer_link = qdr_delivery_link(peer);
+
+ //
+ // Logic:
+ //
+ // If disposition has changed and there is a peer link, set the disposition of the peer
+ // If settled, the delivery must be unlinked and freed.
+ // If settled and there is a peer, the peer shall be settled and unlinked. It shall not
+ // be freed until the connection-side thread settles the PN delivery.
+ //
+ if (disp != dlv->disposition) {
+ //
+ // Disposition has changed, propagate the change to the peer delivery.
+ //
+ dlv->disposition = disp;
+ if (peer) {
+ peer->disposition = disp;
+ peer->error = error;
+ push = true;
+ error_unassigned = false;
+ qdr_delivery_copy_extension_state(dlv, peer, false);
+ }
+ }
+
+ if (settled) {
+ if (peer) {
+ peer->settled = true;
+ if (peer_link) {
+ peer_moved = qdr_delivery_settled_CT(core, peer);
+ if (peer_moved)
+ push = true;
+ }
+ qdr_delivery_unlink_peers_CT(core, dlv, peer);
+ }
+
+ if (dlv_link)
+ dlv_moved = qdr_delivery_settled_CT(core, dlv);
+ }
+
+ //
+ // If the delivery's link has a core endpoint, notify the endpoint of the update
+ //
+ if (dlv_link && dlv_link->core_endpoint)
+ qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv, settled);
+
+ if (push)
+ qdr_delivery_push_CT(core, peer);
+
+ //
+ // Release the action reference, possibly freeing the delivery
+ //
+ qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action");
+
+ //
+ // Release the unsettled references if the deliveries were moved
+ //
+ if (dlv_moved)
+ qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - removed from unsettled (1)");
+ if (peer_moved)
+ qdr_delivery_decref_CT(core, peer, "qdr_update_delivery_CT - removed from unsettled (2)");
+ if (error_unassigned)
+ qdr_error_free(error);
+}
+
+
+static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (!discard)
+ qdr_delete_delivery_internal_CT(core, action->args.delivery.delivery);
+}
+
+
+void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
+{
+ qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
+
+ while (peer) {
+ qdr_link_work_t *work = peer->link_work;
+ qdr_link_t *peer_link = qdr_delivery_link(peer);
+
+ //
+ // Determines if the peer connection can be activated.
+ // For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed
+ // after the streaming message has been sent.
+ //
+ if (!!work && !!peer_link) {
+ sys_mutex_lock(peer_link->conn->work_lock);
+ if (work->processing || work == DEQ_HEAD(peer_link->work_list)) {
+ // Adding this work at priority 0.
+ qdr_add_link_ref(peer_link->conn->links_with_work, peer_link, QDR_LINK_LIST_CLASS_WORK);
+ sys_mutex_unlock(peer_link->conn->work_lock);
+
+ //
+ // Activate the outgoing connection for later processing.
+ //
+ qdr_connection_activate_CT(core, peer_link->conn);
+ }
+ else
+ sys_mutex_unlock(peer_link->conn->work_lock);
+ }
+
+ peer = qdr_delivery_next_peer_CT(in_dlv);
+ }
+}
+
+
+static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (discard)
+ return;
+
+ qdr_delivery_t *in_dlv = action->args.connection.delivery;
+ bool more = action->args.connection.more;
+ qdr_link_t *link = qdr_delivery_link(in_dlv);
+
+ //
+ // If it is already in the undelivered list, don't try to deliver this again.
+ //
+ if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
+ qdr_deliver_continue_peers_CT(core, in_dlv);
+
+ qd_message_t *msg = qdr_delivery_message(in_dlv);
+
+ if (!more && !qd_message_is_discard(msg)) {
+ //
+ // The entire message has now been received. Check to see if there are in process subscriptions that need to
+ // receive this message. in process subscriptions, at this time, can deal only with full messages.
+ //
+ qdr_subscription_t *sub = DEQ_HEAD(in_dlv->subscriptions);
+ while (sub) {
+ DEQ_REMOVE_HEAD(in_dlv->subscriptions);
+ qdr_forward_on_message_CT(core, sub, link, in_dlv->msg);
+ sub = DEQ_HEAD(in_dlv->subscriptions);
+ }
+
+ // This is a multicast delivery or if this is a presettled multi-frame unicast delivery.
+ if (in_dlv->multicast || in_dlv->settled) {
+
+ //
+ // If a delivery is settled but did not go into one of the lists, that means that it is going nowhere.
+ // We dont want to deal with such deliveries.
+ //
+ if (in_dlv->settled && in_dlv->where == QDR_DELIVERY_NOWHERE) {
+ qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 1");
+ return;
+ }
+
+ assert(in_dlv->where == QDR_DELIVERY_IN_SETTLED);
+ //
+ // The router will settle on behalf of the receiver in the case of multicast and send out settled
+ // deliveries to the receivers.
+ //
+ in_dlv->disposition = PN_ACCEPTED;
+ qdr_delivery_push_CT(core, in_dlv);
+
+ //
+ // The in_dlv has one or more peers. These peers will have to be unlinked.
+ //
+ qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
+ qdr_delivery_t *next_peer = 0;
+ while (peer) {
+ next_peer = qdr_delivery_next_peer_CT(in_dlv);
+ qdr_delivery_unlink_peers_CT(core, in_dlv, peer);
+ peer = next_peer;
+ }
+
+ // Remove the delivery from the settled list and decref the in_dlv.
+ in_dlv->where = QDR_DELIVERY_NOWHERE;
+ DEQ_REMOVE(link->settled, in_dlv);
+ qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from settled list");
+ }
+ }
+ }
+
+ // This decref is for the action reference
+ qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 2");
+}
+
+
+void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+ qdr_link_t *link = qdr_delivery_link(dlv);
+ if (!link)
+ return;
+
+ bool activate = false;
+
+ sys_mutex_lock(link->conn->work_lock);
+ if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
+ qdr_delivery_incref(dlv, "qdr_delivery_push_CT - add to updated list");
+ qdr_add_delivery_ref_CT(&link->updated_deliveries, dlv);
+ // Adding this work at priority 0.
+ qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+ activate = true;
+ }
+ sys_mutex_unlock(link->conn->work_lock);
+
+ //
+ // Activate the connection
+ //
+ if (activate)
+ qdr_connection_activate_CT(core, link->conn);
+}
+
+pn_data_t* qdr_delivery_extension_state(qdr_delivery_t *delivery)
+{
+ if (!delivery->extension_state) {
+ delivery->extension_state = pn_data(0);
+ }
+ pn_data_rewind(delivery->extension_state);
+ return delivery->extension_state;
+}
+
+void qdr_delivery_free_extension_state(qdr_delivery_t *delivery)
+{
+ if (delivery->extension_state) {
+ pn_data_free(delivery->extension_state);
+ delivery->extension_state = 0;
+ }
+}
+
+void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition)
+{
+ if (dlv->disposition > PN_MODIFIED) {
+ pn_data_copy(pn_disposition_data(pn_delivery_local(pdlv)), qdr_delivery_extension_state(dlv));
+ if (update_disposition) pn_delivery_update(pdlv, dlv->disposition);
+ qdr_delivery_free_extension_state(dlv);
+ }
+}
+
+void qdr_delivery_export_transfer_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv)
+{
+ qdr_delivery_write_extension_state(dlv, pdlv, true);
+}
+
+void qdr_delivery_export_disposition_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv)
+{
+ qdr_delivery_write_extension_state(dlv, pdlv, false);
+}
+
+void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest, bool update_diposition)
+{
+ if (src->disposition > PN_MODIFIED) {
+ pn_data_copy(qdr_delivery_extension_state(dest), qdr_delivery_extension_state(src));
+ if (update_diposition) dest->disposition = src->disposition;
+ qdr_delivery_free_extension_state(src);
+ }
+}
+
+void qdr_delivery_read_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition)
+{
+ if (disposition > PN_MODIFIED) {
+ pn_data_rewind(disposition_data);
+ pn_data_copy(qdr_delivery_extension_state(dlv), disposition_data);
+ if (update_disposition) dlv->disposition = disposition;
+ }
+}
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
new file mode 100644
index 0000000..daeabb1
--- /dev/null
+++ b/src/router_core/delivery.h
@@ -0,0 +1,147 @@
+#ifndef __delivery_h__
+#define __delivery_h__ 1
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+
+typedef enum {
+ QDR_DELIVERY_NOWHERE = 0,
+ QDR_DELIVERY_IN_UNDELIVERED,
+ QDR_DELIVERY_IN_UNSETTLED,
+ QDR_DELIVERY_IN_SETTLED
+} qdr_delivery_where_t;
+
+
+struct qdr_delivery_t {
+ DEQ_LINKS(qdr_delivery_t);
+ void *context;
+ sys_atomic_t ref_count;
+ bool ref_counted; /// Used to protect against ref count going 1 -> 0 -> 1
+ qdr_link_t_sp link_sp; /// Safe pointer to the link
+ qdr_delivery_t *peer; /// Use this peer if the delivery has one and only one peer.
+ qdr_delivery_ref_t *next_peer_ref;
+ qd_message_t *msg;
+ qd_iterator_t *to_addr;
+ qd_iterator_t *origin;
+ uint64_t disposition;
+ uint32_t ingress_time;
+ pn_data_t *extension_state;
+ qdr_error_t *error;
+ bool settled;
+ bool presettled;
+ qdr_delivery_where_t where;
+ uint8_t tag[32];
+ int tag_length;
+ qd_bitmask_t *link_exclusion;
+ qdr_address_t *tracking_addr;
+ int tracking_addr_bit;
+ int ingress_index;
+ qdr_link_work_t *link_work; ///< Delivery work item for this delivery
+ qdr_subscription_list_t subscriptions;
+ qdr_delivery_ref_list_t peers; /// Use this list if there if the delivery has more than one peer.
+ bool multicast; /// True if this delivery is targeted for a multicast address.
+ bool via_edge; /// True if this delivery arrived via an edge-connection.
+};
+
+ALLOC_DECLARE(qdr_delivery_t);
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Delivery API
+///////////////////////////////////////////////////////////////////////////////
+
+
+bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery);
+bool qdr_delivery_send_complete(const qdr_delivery_t *delivery);
+
+void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
+void *qdr_delivery_get_context(const qdr_delivery_t *delivery);
+
+void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length);
+bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery);
+void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent);
+
+uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
+void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition);
+
+void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted);
+
+qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
+qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery);
+qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery);
+bool qdr_delivery_presettled(const qdr_delivery_t *delivery);
+
+void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label);
+
+void qdr_delivery_read_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition);
+void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition);
+void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest, bool update_diposition);
+
+
+//
+// I/O thread only functions
+//
+
+
+/* release dlv and possibly schedule its deletion on the core thread */
+void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label);
+
+/* handles disposition/settlement changes from remote delivery and schedules Core thread */
+void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp,
+ bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given);
+
+/* invoked when incoming message data arrives - schedule core thread */
+qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery);
+
+
+//
+// CORE thread only functions
+//
+
+
+/* update settlement and/or disposition and schedule I/O processing */
+void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+
+/* add dlv to links list of updated deliveries and schedule I/O thread processing */
+void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
+
+/* optimized decref for core thread */
+void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery, const char *label);
+
+/* peer delivery list management*/
+void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv);
+void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer);
+
+/* peer iterator - warning: not reentrant! */
+qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv);
+qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv);
+
+/* schedules all peer deliveries with work for I/O processing */
+void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv);
+
+/* update the links counters with respect to its delivery */
+void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+
+
+#endif // __delivery_h__
diff --git a/src/router_core/exchange_bindings.c b/src/router_core/exchange_bindings.c
index 23a58d6..0f5908c 100644
--- a/src/router_core/exchange_bindings.c
+++ b/src/router_core/exchange_bindings.c
@@ -22,6 +22,7 @@
#include "router_core_private.h"
#include "forwarder.h"
#include "exchange_bindings.h"
+#include "delivery.h"
// next_hop_t
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index b7f2ad9..8e7a8bb 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -22,6 +22,7 @@
#include <stdio.h>
#include <strings.h>
#include "forwarder.h"
+#include "delivery.h"
typedef struct qdr_forward_deliver_info_t {
DEQ_LINKS(struct qdr_forward_deliver_info_t);
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 281b260..fcc7421 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -21,13 +21,13 @@
#include "route_control.h"
#include "exchange_bindings.h"
#include "core_events.h"
+#include "delivery.h"
#include <stdio.h>
#include <strings.h>
ALLOC_DEFINE(qdr_address_t);
ALLOC_DEFINE(qdr_address_config_t);
ALLOC_DEFINE(qdr_node_t);
-ALLOC_DEFINE(qdr_delivery_t);
ALLOC_DEFINE(qdr_delivery_ref_t);
ALLOC_DEFINE(qdr_link_t);
ALLOC_DEFINE(qdr_router_ref_t);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index e4666a5..e7161e5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -366,13 +366,6 @@ struct qdr_router_ref_t {
DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
-typedef enum {
- QDR_DELIVERY_NOWHERE = 0,
- QDR_DELIVERY_IN_UNDELIVERED,
- QDR_DELIVERY_IN_UNSETTLED,
- QDR_DELIVERY_IN_SETTLED
-} qdr_delivery_where_t;
-
typedef struct qdr_delivery_ref_t {
DEQ_LINKS(struct qdr_delivery_ref_t);
qdr_delivery_t *dlv;
@@ -391,42 +384,8 @@ struct qdr_subscription_t {
DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t);
-
-struct qdr_delivery_t {
- DEQ_LINKS(qdr_delivery_t);
- void *context;
- sys_atomic_t ref_count;
- bool ref_counted; /// Used to protect against ref count going 1 -> 0 -> 1
- qdr_link_t_sp link_sp; /// Safe pointer to the link
- qdr_delivery_t *peer; /// Use this peer if the delivery has one and only one peer.
- qdr_delivery_ref_t *next_peer_ref;
- qd_message_t *msg;
- qd_iterator_t *to_addr;
- qd_iterator_t *origin;
- uint64_t disposition;
- uint32_t ingress_time;
- pn_data_t *extension_state;
- qdr_error_t *error;
- bool settled;
- bool presettled;
- qdr_delivery_where_t where;
- uint8_t tag[32];
- int tag_length;
- qd_bitmask_t *link_exclusion;
- qdr_address_t *tracking_addr;
- int tracking_addr_bit;
- int ingress_index;
- qdr_link_work_t *link_work; ///< Delivery work item for this delivery
- qdr_subscription_list_t subscriptions;
- qdr_delivery_ref_list_t peers; /// Use this list if there if the delivery has more than one peer.
- bool multicast; /// True if this delivery is targeted for a multicast address.
- bool via_edge; /// True if this delivery arrived via an edge-connection.
-};
-
-ALLOC_DECLARE(qdr_delivery_t);
DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
-
void qdr_add_delivery_ref_CT(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv);
void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref);
@@ -927,45 +886,8 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
*/
bool qdr_address_is_mobile_CT(qdr_address_t *addr);
-void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
-void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
-void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
-bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
-void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery, const char *label);
void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg);
void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control);
-/**
- * Links the in_dlv to the out_dlv and increments ref counts of both deliveries
- */
-void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv);
-
-/**
- * Zeroes out peer references from both peers and decrefs ref counts.
- */
-void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer);
-
-/**
- *
- */
-void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv);
-
-/**
- * Returns the first peer of the delivery.
- * @see qdr_delivery_next_peer_CT
- */
-qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv);
-
-/**
- * Returns the next peer of the passed in delivery.
- */
-qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv);
-
-
-/**
- * Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc.
-*/
-void qdr_increment_delivery_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery);
-
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, qd_address_treatment_t treatment);
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 3e6fe73..99b1db9 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -19,23 +19,19 @@
#include "router_core_private.h"
#include "exchange_bindings.h"
+#include "delivery.h"
#include <qpid/dispatch/amqp.h>
#include <stdio.h>
#include <inttypes.h>
-static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
//==================================================================================
// Internal Functions
//==================================================================================
-void qdr_delivery_read_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_date, bool update_disposition);
-void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest, bool update_disposition);
+static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
//==================================================================================
@@ -130,21 +126,6 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
}
-qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *in_dlv)
-{
- qdr_action_t *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue");
- action->args.connection.delivery = in_dlv;
-
- qd_message_t *msg = qdr_delivery_message(in_dlv);
- action->args.connection.more = !qd_message_receive_complete(msg);
-
- // This incref is for the action reference
- qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list");
- qdr_action_enqueue(core, action);
- return in_dlv;
-}
-
-
int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
{
qdr_connection_t *conn = link->conn;
@@ -249,6 +230,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
return num_deliveries_completed;
}
+
void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode)
{
qdr_action_t *action = qdr_action(qdr_link_flow_CT, "link_flow");
@@ -299,515 +281,10 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, const char *addr, bool ex
}
-void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition,
- bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given)
-{
- qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery");
- action->args.delivery.delivery = delivery;
- action->args.delivery.disposition = disposition;
- action->args.delivery.settled = settled;
- action->args.delivery.error = error;
-
- // handle delivery-state extensions e.g. declared, transactional-state
- qdr_delivery_read_extension_state(delivery, disposition, ext_state, false);
-
- //
- // The delivery's ref_count must be incremented to protect its travels into the
- // core thread. If the caller has given its reference to us, we can simply use
- // the given ref rather than increment a new one.
- //
- if (!ref_given)
- qdr_delivery_incref(delivery, "qdr_delivery_update_disposition - add to action list");
-
- qdr_action_enqueue(core, action);
-}
-
-
-void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
-{
- delivery->context = context;
-}
-
-
-void *qdr_delivery_get_context(qdr_delivery_t *delivery)
-{
- return delivery->context;
-}
-
-
-qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery)
-{
- return delivery ? safe_deref_qdr_link_t(delivery->link_sp) : 0;
-}
-
-
-bool qdr_delivery_send_complete(const qdr_delivery_t *delivery)
-{
- if (!delivery)
- return false;
- return qd_message_send_complete(delivery->msg);
-}
-
-bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery)
-{
- if (!delivery)
- return false;
- return qd_message_tag_sent(delivery->msg);
-}
-
-void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent)
-{
- if (!delivery)
- return;
-
- qd_message_set_tag_sent(delivery->msg, tag_sent);
-}
-
-
-bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery)
-{
- if (!delivery)
- return false;
- return qd_message_receive_complete(delivery->msg);
-}
-
-void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition)
-{
- if (delivery)
- delivery->disposition = disposition;
-}
-
-
-uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery)
-{
- if (!delivery)
- return 0;
- return delivery->disposition;
-}
-
-
-void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label)
-{
- uint32_t rc = sys_atomic_inc(&delivery->ref_count);
- assert(rc > 0 || !delivery->ref_counted);
- delivery->ref_counted = true;
- qdr_link_t *link = qdr_delivery_link(delivery);
- if (link)
- qd_log(link->core->log, QD_LOG_DEBUG,
- "Delivery incref: dlv:%lx rc:%"PRIu32" %s", (long) delivery, rc + 1, label);
-}
-
-void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted)
-{
- assert(delivery);
- qd_message_set_aborted(delivery->msg, aborted);
-}
-
-bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery)
-{
- if (!delivery)
- return false;
- return qd_message_aborted(delivery->msg);
-}
-
-
-void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label)
-{
- uint32_t ref_count = sys_atomic_dec(&delivery->ref_count);
- assert(ref_count > 0);
- qd_log(core->log, QD_LOG_DEBUG, "Delivery decref: dlv:%lx rc:%"PRIu32" %s", (long) delivery, ref_count - 1, label);
-
- if (ref_count == 1) {
- //
- // The delivery deletion must occur inside the core thread.
- // Queue up an action to do the work.
- //
- qdr_action_t *action = qdr_action(qdr_delete_delivery_CT, "delete_delivery");
- action->args.delivery.delivery = delivery;
- action->label = label;
- qdr_action_enqueue(core, action);
- }
-}
-
-
-void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length)
-{
- *tag = (const char*) delivery->tag;
- *length = delivery->tag_length;
-}
-
-
-qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
-{
- if (!delivery)
- return 0;
- return delivery->msg;
-}
-
-qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery)
-{
- return delivery->error;
-}
-
-bool qdr_delivery_presettled(const qdr_delivery_t *delivery)
-{
- return delivery->presettled;
-}
-
-
//==================================================================================
// In-Thread Functions
//==================================================================================
-void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv)
-{
- bool push = false;
- bool moved = false;
-
- if (dlv->presettled) {
- //
- // The delivery is presettled. We simply want to call CORE_delivery_update which in turn will
- // restart stalled links if the q2_holdoff has been hit.
- // For single frame presettled deliveries, calling CORE_delivery_update does not do anything.
- //
- push = true;
- }
- else {
- push = dlv->disposition != PN_RELEASED;
- dlv->disposition = PN_RELEASED;
- dlv->settled = true;
- moved = qdr_delivery_settled_CT(core, dlv);
-
- }
-
- if (push || moved)
- qdr_delivery_push_CT(core, dlv);
-
- //
- // Remove the unsettled reference
- //
- if (moved)
- qdr_delivery_decref_CT(core, dlv, "qdr_delivery_release_CT - remove from unsettled list");
-}
-
-
-void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *dlv)
-{
- bool push = dlv->disposition != PN_MODIFIED;
-
- dlv->disposition = PN_MODIFIED;
- dlv->settled = true;
- bool moved = qdr_delivery_settled_CT(core, dlv);
-
- if (push || moved)
- qdr_delivery_push_CT(core, dlv);
-
- //
- // Remove the unsettled reference
- //
- if (moved)
- qdr_delivery_decref_CT(core, dlv, "qdr_delivery_failed_CT - remove from unsettled list");
-}
-
-
-bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
-{
- //
- // Remove a delivery from its unsettled list. Side effects include issuing
- // replacement credit and visiting the link-quiescence algorithm
- //
- qdr_link_t *link = qdr_delivery_link(dlv);
- qdr_connection_t *conn = link ? link->conn : 0;
- bool moved = false;
-
- if (!link || !conn)
- return false;
-
- //
- // The lock needs to be acquired only for outgoing links
- //
- if (link->link_direction == QD_OUTGOING)
- sys_mutex_lock(conn->work_lock);
-
- if (dlv->where == QDR_DELIVERY_IN_UNSETTLED) {
- DEQ_REMOVE(link->unsettled, dlv);
- dlv->where = QDR_DELIVERY_NOWHERE;
- moved = true;
- }
-
- if (link->link_direction == QD_OUTGOING)
- sys_mutex_unlock(conn->work_lock);
-
- if (dlv->tracking_addr) {
- dlv->tracking_addr->outstanding_deliveries[dlv->tracking_addr_bit]--;
- dlv->tracking_addr->tracked_deliveries--;
-
- if (dlv->tracking_addr->tracked_deliveries == 0)
- qdr_check_addr_CT(core, dlv->tracking_addr);
-
- dlv->tracking_addr = 0;
- }
-
- //
- // If this is an incoming link and it is not link-routed or inter-router, issue
- // one replacement credit on the link. Note that credit on inter-router links is
- // issued immediately even for unsettled deliveries.
- //
- if (moved && link->link_direction == QD_INCOMING &&
- link->link_type != QD_LINK_ROUTER && !link->connected_link)
- qdr_link_issue_credit_CT(core, link, 1, false);
-
- return moved;
-}
-
-void qdr_increment_delivery_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery)
-{
- qdr_link_t *link = qdr_delivery_link(delivery);
- if (link) {
- bool do_rate = false;
-
- if (delivery->presettled) {
- do_rate = delivery->disposition != PN_RELEASED;
- link->presettled_deliveries++;
- if (link->link_direction == QD_INCOMING && link->link_type == QD_LINK_ENDPOINT)
- core->presettled_deliveries++;
- }
- else if (delivery->disposition == PN_ACCEPTED) {
- do_rate = true;
- link->accepted_deliveries++;
- if (link->link_direction == QD_INCOMING)
- core->accepted_deliveries++;
- }
- else if (delivery->disposition == PN_REJECTED) {
- do_rate = true;
- link->rejected_deliveries++;
- if (link->link_direction == QD_INCOMING)
- core->rejected_deliveries++;
- }
- else if (delivery->disposition == PN_RELEASED && !delivery->presettled) {
- link->released_deliveries++;
- if (link->link_direction == QD_INCOMING)
- core->released_deliveries++;
- }
- else if (delivery->disposition == PN_MODIFIED) {
- link->modified_deliveries++;
- if (link->link_direction == QD_INCOMING)
- core->modified_deliveries++;
- }
-
- uint32_t delay = core->uptime_ticks - delivery->ingress_time;
- if (delay > 10) {
- link->deliveries_delayed_10sec++;
- if (link->link_direction == QD_INCOMING)
- core->deliveries_delayed_10sec++;
- } else if (delay > 1) {
- link->deliveries_delayed_1sec++;
- if (link->link_direction == QD_INCOMING)
- core->deliveries_delayed_1sec++;
- }
-
- if (qd_bitmask_valid_bit_value(delivery->ingress_index) && link->ingress_histogram)
- link->ingress_histogram[delivery->ingress_index]++;
-
- //
- // Compute the settlement rate
- //
- if (do_rate) {
- uint32_t delta_time = core->uptime_ticks - link->core_ticks;
- if (delta_time > 0) {
- if (delta_time > QDR_LINK_RATE_DEPTH)
- delta_time = QDR_LINK_RATE_DEPTH;
- for (uint8_t delta_slots = 0; delta_slots < delta_time; delta_slots++) {
- link->rate_cursor = (link->rate_cursor + 1) % QDR_LINK_RATE_DEPTH;
- link->settled_deliveries[link->rate_cursor] = 0;
- }
- link->core_ticks = core->uptime_ticks;
- }
- link->settled_deliveries[link->rate_cursor]++;
- }
- }
-}
-
-
-static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery)
-{
- assert(sys_atomic_get(&delivery->ref_count) == 0);
-
- if (delivery->msg || delivery->to_addr) {
- qdr_delivery_cleanup_t *cleanup = new_qdr_delivery_cleanup_t();
-
- DEQ_ITEM_INIT(cleanup);
- cleanup->msg = delivery->msg;
- cleanup->iter = delivery->to_addr;
-
- DEQ_INSERT_TAIL(core->delivery_cleanup_list, cleanup);
- }
-
- if (delivery->tracking_addr) {
- delivery->tracking_addr->outstanding_deliveries[delivery->tracking_addr_bit]--;
- delivery->tracking_addr->tracked_deliveries--;
-
- if (delivery->tracking_addr->tracked_deliveries == 0)
- qdr_check_addr_CT(core, delivery->tracking_addr);
-
- delivery->tracking_addr = 0;
- }
-
- qdr_increment_delivery_counters_CT(core, delivery);
-
- //
- // Free all the peer qdr_delivery_ref_t references
- //
- qdr_delivery_ref_t *ref = DEQ_HEAD(delivery->peers);
- while (ref) {
- qdr_del_delivery_ref(&delivery->peers, ref);
- ref = DEQ_HEAD(delivery->peers);
- }
-
- qd_bitmask_free(delivery->link_exclusion);
- qdr_error_free(delivery->error);
-
- free_qdr_delivery_t(delivery);
-
-}
-
-static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv)
-{
- return dlv->peer || DEQ_SIZE(dlv->peers) > 0;
-}
-
-void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv)
-{
- // If there is no delivery or a peer, we cannot link each other.
- if (!in_dlv || !out_dlv)
- return;
-
- if (!qdr_delivery_has_peer_CT(in_dlv)) {
- // This is the very first peer. Link them up.
- assert(!out_dlv->peer);
- in_dlv->peer = out_dlv;
- }
- else {
- if (in_dlv->peer) {
- // This is the first time we know that in_dlv is going to have more than one peer.
- // There is already a peer in the in_dlv->peer pointer, move it into a list and zero it out.
- qdr_add_delivery_ref_CT(&in_dlv->peers, in_dlv->peer);
-
- // Zero out the peer pointer. Since there is more than one peer, this peer has been moved to the "peers" linked list.
- // All peers will now reside in the peers linked list. No need to decref/incref here because you are transferring ownership.
- in_dlv->peer = 0;
- }
-
- qdr_add_delivery_ref_CT(&in_dlv->peers, out_dlv);
- }
-
- out_dlv->peer = in_dlv;
-
- qdr_delivery_incref(out_dlv, "qdr_delivery_link_peers_CT - linked to peer (out delivery)");
- qdr_delivery_incref(in_dlv, "qdr_delivery_link_peers_CT - linked to peer (in delivery)");
-}
-
-
-void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer)
-{
- // If there is no delivery or a peer, we cannot proceed.
- if (!dlv || !peer)
- return;
-
- // first, drop dlv's reference to its peer
- //
- if (dlv->peer) {
- //
- // This is the easy case. One delivery has only one peer. we can simply
- // zero them out and directly decref.
- //
- assert(dlv->peer == peer);
- dlv->peer = 0;
- } else {
- //
- // This is the not so easy case
- //
- // dlv has more than one peer, so we have to search for our target peer
- // in the list of peers
- //
- qdr_delivery_ref_t *peer_ref = DEQ_HEAD(dlv->peers);
- while (peer_ref && peer_ref->dlv != peer) {
- peer_ref = DEQ_NEXT(peer_ref);
- }
- assert(peer_ref != 0);
- qdr_del_delivery_ref(&dlv->peers, peer_ref);
- }
-
- // now drop the peer's reference to dlv
- //
- if (peer->peer) {
- assert(peer->peer == dlv);
- peer->peer = 0;
- } else {
- qdr_delivery_ref_t *peer_ref = DEQ_HEAD(peer->peers);
- while (peer_ref && peer_ref->dlv != dlv) {
- peer_ref = DEQ_NEXT(peer_ref);
- }
- assert(peer_ref != 0);
- qdr_del_delivery_ref(&peer->peers, peer_ref);
- }
-
- qdr_delivery_decref_CT(core, dlv, "qdr_delivery_unlink_peers_CT - unlinked from peer (delivery)");
- qdr_delivery_decref_CT(core, peer, "qdr_delivery_unlink_peers_CT - unlinked from delivery (peer)");
-}
-
-
-qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv)
-{
- // What if there are no peers for this delivery?
- if (!qdr_delivery_has_peer_CT(dlv))
- return 0;
-
- if (dlv->peer) {
- // If there is a dlv->peer, it is the one and only peer.
- return dlv->peer;
- }
- else {
- // The delivery has more than one peer.
- qdr_delivery_ref_t *peer_ref = DEQ_HEAD(dlv->peers);
-
- // Save the next peer to dlv->next_peer_ref so we can use it when somebody calls qdr_delivery_next_peer_CT
- dlv->next_peer_ref = DEQ_NEXT(peer_ref);
-
- // Return the first peer.
- return peer_ref->dlv;
- }
-}
-
-qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv)
-{
- if (dlv->peer) {
- // There is no next_peer if there is only one peer. If there is a non-zero dlv->peer, it is the only peer
- return 0;
- }
- else {
- // There is more than one peer to this delivery.
- qdr_delivery_ref_t *next_peer_ref = dlv->next_peer_ref;
- if (next_peer_ref) {
- // Save the next peer to dlv->next_peer_ref so we can use it when somebody calls qdr_delivery_next_peer_CT
- dlv->next_peer_ref = DEQ_NEXT(dlv->next_peer_ref);
- return next_peer_ref->dlv;
- }
- return 0;
- }
-}
-
-
-void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv, const char *label)
-{
- uint32_t ref_count = sys_atomic_dec(&dlv->ref_count);
- qd_log(core->log, QD_LOG_DEBUG, "Delivery decref_CT: dlv:%lx rc:%"PRIu32" %s", (long) dlv, ref_count - 1, label);
- assert(ref_count > 0);
-
- if (ref_count == 1)
- qdr_delete_delivery_internal_CT(core, dlv);
-}
-
static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
@@ -1200,198 +677,6 @@ void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_mess
}
-static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
- qdr_delivery_t *dlv = action->args.delivery.delivery;
- qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv);
- bool push = false;
- bool peer_moved = false;
- bool dlv_moved = false;
- uint64_t disp = action->args.delivery.disposition;
- bool settled = action->args.delivery.settled;
- qdr_error_t *error = action->args.delivery.error;
- bool error_unassigned = true;
-
- qdr_link_t *dlv_link = qdr_delivery_link(dlv);
- qdr_link_t *peer_link = qdr_delivery_link(peer);
-
- //
- // Logic:
- //
- // If disposition has changed and there is a peer link, set the disposition of the peer
- // If settled, the delivery must be unlinked and freed.
- // If settled and there is a peer, the peer shall be settled and unlinked. It shall not
- // be freed until the connection-side thread settles the PN delivery.
- //
- if (disp != dlv->disposition) {
- //
- // Disposition has changed, propagate the change to the peer delivery.
- //
- dlv->disposition = disp;
- if (peer) {
- peer->disposition = disp;
- peer->error = error;
- push = true;
- error_unassigned = false;
- qdr_delivery_copy_extension_state(dlv, peer, false);
- }
- }
-
- if (settled) {
- if (peer) {
- peer->settled = true;
- if (peer_link) {
- peer_moved = qdr_delivery_settled_CT(core, peer);
- if (peer_moved)
- push = true;
- }
- qdr_delivery_unlink_peers_CT(core, dlv, peer);
- }
-
- if (dlv_link)
- dlv_moved = qdr_delivery_settled_CT(core, dlv);
- }
-
- //
- // If the delivery's link has a core endpoint, notify the endpoint of the update
- //
- if (dlv_link && dlv_link->core_endpoint)
- qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv, settled);
-
- if (push)
- qdr_delivery_push_CT(core, peer);
-
- //
- // Release the action reference, possibly freeing the delivery
- //
- qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action");
-
- //
- // Release the unsettled references if the deliveries were moved
- //
- if (dlv_moved)
- qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - removed from unsettled (1)");
- if (peer_moved)
- qdr_delivery_decref_CT(core, peer, "qdr_update_delivery_CT - removed from unsettled (2)");
- if (error_unassigned)
- qdr_error_free(error);
-}
-
-
-static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
- if (!discard)
- qdr_delete_delivery_internal_CT(core, action->args.delivery.delivery);
-}
-
-
-void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
-{
- qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
-
- while (peer) {
- qdr_link_work_t *work = peer->link_work;
- qdr_link_t *peer_link = qdr_delivery_link(peer);
-
- //
- // Determines if the peer connection can be activated.
- // For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed
- // after the streaming message has been sent.
- //
- if (!!work && !!peer_link) {
- sys_mutex_lock(peer_link->conn->work_lock);
- if (work->processing || work == DEQ_HEAD(peer_link->work_list)) {
- // Adding this work at priority 0.
- qdr_add_link_ref(peer_link->conn->links_with_work, peer_link, QDR_LINK_LIST_CLASS_WORK);
- sys_mutex_unlock(peer_link->conn->work_lock);
-
- //
- // Activate the outgoing connection for later processing.
- //
- qdr_connection_activate_CT(core, peer_link->conn);
- }
- else
- sys_mutex_unlock(peer_link->conn->work_lock);
- }
-
- peer = qdr_delivery_next_peer_CT(in_dlv);
- }
-}
-
-
-static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
- if (discard)
- return;
-
- qdr_delivery_t *in_dlv = action->args.connection.delivery;
- bool more = action->args.connection.more;
- qdr_link_t *link = qdr_delivery_link(in_dlv);
-
- //
- // If it is already in the undelivered list, don't try to deliver this again.
- //
- if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
- qdr_deliver_continue_peers_CT(core, in_dlv);
-
- qd_message_t *msg = qdr_delivery_message(in_dlv);
-
- if (!more && !qd_message_is_discard(msg)) {
- //
- // The entire message has now been received. Check to see if there are in process subscriptions that need to
- // receive this message. in process subscriptions, at this time, can deal only with full messages.
- //
- qdr_subscription_t *sub = DEQ_HEAD(in_dlv->subscriptions);
- while (sub) {
- DEQ_REMOVE_HEAD(in_dlv->subscriptions);
- qdr_forward_on_message_CT(core, sub, link, in_dlv->msg);
- sub = DEQ_HEAD(in_dlv->subscriptions);
- }
-
- // This is a multicast delivery or if this is a presettled multi-frame unicast delivery.
- if (in_dlv->multicast || in_dlv->settled) {
-
- //
- // If a delivery is settled but did not go into one of the lists, that means that it is going nowhere.
- // We dont want to deal with such deliveries.
- //
- if (in_dlv->settled && in_dlv->where == QDR_DELIVERY_NOWHERE) {
- qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 1");
- return;
- }
-
- assert(in_dlv->where == QDR_DELIVERY_IN_SETTLED);
- //
- // The router will settle on behalf of the receiver in the case of multicast and send out settled
- // deliveries to the receivers.
- //
- in_dlv->disposition = PN_ACCEPTED;
- qdr_delivery_push_CT(core, in_dlv);
-
- //
- // The in_dlv has one or more peers. These peers will have to be unlinked.
- //
- qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
- qdr_delivery_t *next_peer = 0;
- while (peer) {
- next_peer = qdr_delivery_next_peer_CT(in_dlv);
- qdr_delivery_unlink_peers_CT(core, in_dlv, peer);
- peer = next_peer;
- }
-
- // Remove the delivery from the settled list and decref the in_dlv.
- in_dlv->where = QDR_DELIVERY_NOWHERE;
- DEQ_REMOVE(link->settled, in_dlv);
- qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from settled list");
- }
- }
- }
-
- // This decref is for the action reference
- qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 2");
-}
-
-
/**
* Add link-work to provide credit to the link in an IO thread
*/
@@ -1482,83 +767,3 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
}
}
}
-
-
-void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
-{
- qdr_link_t *link = qdr_delivery_link(dlv);
- if (!link)
- return;
-
- bool activate = false;
-
- sys_mutex_lock(link->conn->work_lock);
- if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
- qdr_delivery_incref(dlv, "qdr_delivery_push_CT - add to updated list");
- qdr_add_delivery_ref_CT(&link->updated_deliveries, dlv);
- // Adding this work at priority 0.
- qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
- activate = true;
- }
- sys_mutex_unlock(link->conn->work_lock);
-
- //
- // Activate the connection
- //
- if (activate)
- qdr_connection_activate_CT(core, link->conn);
-}
-
-pn_data_t* qdr_delivery_extension_state(qdr_delivery_t *delivery)
-{
- if (!delivery->extension_state) {
- delivery->extension_state = pn_data(0);
- }
- pn_data_rewind(delivery->extension_state);
- return delivery->extension_state;
-}
-
-void qdr_delivery_free_extension_state(qdr_delivery_t *delivery)
-{
- if (delivery->extension_state) {
- pn_data_free(delivery->extension_state);
- delivery->extension_state = 0;
- }
-}
-
-void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition)
-{
- if (dlv->disposition > PN_MODIFIED) {
- pn_data_copy(pn_disposition_data(pn_delivery_local(pdlv)), qdr_delivery_extension_state(dlv));
- if (update_disposition) pn_delivery_update(pdlv, dlv->disposition);
- qdr_delivery_free_extension_state(dlv);
- }
-}
-
-void qdr_delivery_export_transfer_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv)
-{
- qdr_delivery_write_extension_state(dlv, pdlv, true);
-}
-
-void qdr_delivery_export_disposition_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv)
-{
- qdr_delivery_write_extension_state(dlv, pdlv, false);
-}
-
-void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest, bool update_diposition)
-{
- if (src->disposition > PN_MODIFIED) {
- pn_data_copy(qdr_delivery_extension_state(dest), qdr_delivery_extension_state(src));
- if (update_diposition) dest->disposition = src->disposition;
- qdr_delivery_free_extension_state(src);
- }
-}
-
-void qdr_delivery_read_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition)
-{
- if (disposition > PN_MODIFIED) {
- pn_data_rewind(disposition_data);
- pn_data_copy(qdr_delivery_extension_state(dlv), disposition_data);
- if (update_disposition) dlv->disposition = disposition;
- }
-}
diff --git a/src/router_node.c b/src/router_node.c
index 19d2ae2..f3c9966 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -26,6 +26,7 @@
#include "dispatch_private.h"
#include "entity_cache.h"
#include "router_private.h"
+#include "delivery.h"
#include <qpid/dispatch/router_core.h>
#include <qpid/dispatch/proton_utils.h>
#include <proton/sasl.h>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org