You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2017/09/13 20:18:59 UTC
[1/2] qpid-dispatch git commit: DISPATCH-825: Fix interlocking
between message send and receive
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 484fc63b0 -> fa2a4632d
DISPATCH-825: Fix interlocking between message send and receive
Don't allow a buffer on the message buffer chain that might be removed.
Use a content-based pending buffer instead.
Don't release message lock between adding final buffer and setting
receive_complete flag.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b6e10668
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b6e10668
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b6e10668
Branch: refs/heads/master
Commit: b6e10668a59fc3953698ce1fcc27f45b307276c3
Parents: 484fc63
Author: Chuck Rolke <cr...@redhat.com>
Authored: Wed Sep 13 11:49:30 2017 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Wed Sep 13 11:49:30 2017 -0400
----------------------------------------------------------------------
src/message.c | 182 +++++++++++++++++++++++++++------------------
src/message_private.h | 1 +
2 files changed, 112 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6e10668/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 0788194..97cb382 100644
--- a/src/message.c
+++ b/src/message.c
@@ -894,6 +894,9 @@ void qd_message_free(qd_message_t *in_msg)
buf = DEQ_HEAD(content->buffers);
}
+ if (content->pending)
+ qd_buffer_free(content->pending);
+
sys_mutex_free(content->lock);
free_qd_message_content_t(content);
}
@@ -1085,19 +1088,53 @@ qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *in_msg)
return msg->cursor;
}
+
+/**
+ * Receive and discard large messages for which there is no destination.
+ * Don't waste resources by putting the message into internal buffers.
+ * Don't fiddle with locking as no sender is competing with reception.
+ */
+qd_message_t *discard_receive(pn_delivery_t *delivery,
+ pn_link_t *link,
+ qd_message_t *msg_in)
+{
+ qd_message_pvt_t *msg = (qd_message_pvt_t*)msg_in;
+
+ while (1) {
+ char dummy[BUFFER_SIZE];
+ ssize_t rc = pn_link_recv(link, dummy, BUFFER_SIZE);
+
+ if (rc == 0) {
+ // have read all available pn_link incoming bytes
+ break;
+ } else if (rc == PN_EOS || rc < 0) {
+ // end of message or error. Call the message complete
+ msg->content->receive_complete = true;
+
+ pn_record_t *record = pn_delivery_attachments(delivery);
+ pn_record_set(record, PN_DELIVERY_CTX, 0);
+ break;
+ } else {
+ // rc was > 0. bytes were read and discarded.
+ }
+ }
+
+ return msg_in;
+}
+
+
qd_message_t *qd_message_receive(pn_delivery_t *delivery)
{
pn_link_t *link = pn_delivery_link(delivery);
ssize_t rc;
- qd_buffer_t *buf = 0;
pn_record_t *record = pn_delivery_attachments(delivery);
qd_message_pvt_t *msg = (qd_message_pvt_t*) pn_record_get(record, PN_DELIVERY_CTX);
//
- // If there is no message associated with the delivery, this is the first time
- // we've received anything on this delivery. Allocate a message descriptor and
- // link it and the delivery together.
+ // If there is no message associated with the delivery then this is the
+ // first time we've received anything on this delivery.
+ // Allocate a message descriptor and link it and the delivery together.
//
if (!msg) {
msg = (qd_message_pvt_t*) qd_message();
@@ -1109,88 +1146,89 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
}
//
- // The discard flag indicates if we should continue receiving the message.
- // This is pertinent in the case of large messages. When large messages are being received, we try to send out part of the
- // message that has been received so far. If we not able to send it anywhere, there is no need to keep creating buffers
- //
- bool discard = qd_message_is_discard((qd_message_t*)msg);
-
- //
- // Get a reference to the tail buffer on the message. This is the buffer into which
- // we will store incoming message data. If there is no buffer in the message, this is the
- // first time we are here and we need to allocate an empty one and add it to the message.
+ // The discard flag indicates we should keep reading the input stream
+ // but not process the message for delivery.
//
- if (!discard) {
- buf = DEQ_TAIL(msg->content->buffers);
- if (!buf) {
- buf = qd_buffer();
- DEQ_INSERT_TAIL(msg->content->buffers, buf);
- }
+ if (qd_message_is_discard((qd_message_t*)msg)) {
+ return discard_receive(delivery, link, (qd_message_t *)msg);
}
+
+ // Loop until msg is complete, error seen, or incoming bytes are consumed
+ bool recv_error = false;
while (1) {
- if (discard) {
- char dummy[BUFFER_SIZE];
- rc = pn_link_recv(link, dummy, BUFFER_SIZE);
- }
- else {
- //
- // Try to receive enough data to fill the remaining space in the tail buffer.
- //
+ //
+ // handle EOS and clean up after pn receive errors
+ //
+ bool at_eos = (pn_delivery_partial(delivery) == false) &&
+ (pn_delivery_pending(delivery) == 0);
+
+ if (at_eos || recv_error) {
+ // Message is complete
+ sys_mutex_lock(msg->content->lock);
+ {
+ // Append last buffer if any with data
+ if (msg->content->pending) {
+ if (qd_buffer_size(msg->content->pending) > 0) {
+ // pending buffer has bytes that are port of message
+ DEQ_INSERT_TAIL(msg->content->buffers,
+ msg->content->pending);
+ } else {
+ // pending buffer is empty
+ qd_buffer_free(msg->content->pending);
+ }
+ msg->content->pending = 0;
+ } else {
+ // pending buffer is absent
+ }
+
+ msg->content->receive_complete = true;
- rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf));
+ // unlink message and delivery
+ pn_record_set(record, PN_DELIVERY_CTX, 0);
+ }
+ sys_mutex_unlock(msg->content->lock);
+ return (qd_message_t*) msg;
}
//
- // If we receive PN_EOS, we have come to the end of the message.
+ // Handle a missing or full pending buffer
//
- if (rc == PN_EOS) {
- //
- // We have received the entire message since rc == PN_EOS, set the receive_complete flag to true
- //
- msg->content->receive_complete = true;
-
- //
- // Clear the value in the record with key PN_DELIVERY_CTX
- //
- pn_record_set(record, PN_DELIVERY_CTX, 0);
-
- //
- // If the last buffer in the list is empty, remove it and free it. This
- // will only happen if the size of the message content is an exact multiple
- // of the buffer size.
- //
- if (buf && qd_buffer_size(buf) == 0) {
+ if (!msg->content->pending) {
+ // Pending buffer is absent: get a new one
+ msg->content->pending = qd_buffer();
+ } else {
+ // Pending buffer exists
+ if (qd_buffer_capacity(msg->content->pending) == 0) {
+ // Pending buffer is full
sys_mutex_lock(msg->content->lock);
- DEQ_REMOVE_TAIL(msg->content->buffers);
+ DEQ_INSERT_TAIL(msg->content->buffers, msg->content->pending);
sys_mutex_unlock(msg->content->lock);
- qd_buffer_free(buf);
+ msg->content->pending = qd_buffer();
+ } else {
+ // Pending buffer still has capacity
}
-
- return (qd_message_t*) msg;
}
- if (rc > 0) {
- if (discard)
- continue;
+ //
+ // Try to fill the remaining space in the pending buffer.
+ //
+ rc = pn_link_recv(link,
+ (char*) qd_buffer_cursor(msg->content->pending),
+ qd_buffer_capacity(msg->content->pending));
+
+ assert (rc != PN_EOS); // Just checked for this moments ago
+
+ if (rc < 0) {
+ // error seen. next pass breaks out of loop
+ recv_error = true;
+ } else if (rc > 0) {
//
// We have received a positive number of bytes for the message. Advance
// the cursor in the buffer.
//
- qd_buffer_insert(buf, rc);
-
- //
- // If the buffer is full, allocate a new empty buffer and append it to the
- // tail of the message's list.
- //
- sys_mutex_lock(msg->content->lock);
- if (qd_buffer_capacity(buf) == 0) {
- buf = qd_buffer();
- DEQ_INSERT_TAIL(msg->content->buffers, buf);
- }
- sys_mutex_unlock(msg->content->lock);
-
- } else
+ qd_buffer_insert(msg->content->pending, rc);
+ } else {
//
// We received zero bytes, and no PN_EOS. This means that we've received
// all of the data available up to this point, but it does not constitute
@@ -1198,6 +1236,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
// Return the message so that the caller can start sending out whatever we have received so far
//
return (qd_message_t*) msg;
+ }
}
return 0;
@@ -1440,16 +1479,17 @@ void qd_message_send(qd_message_t *in_msg,
buf = msg->cursor.buffer;
- if (!buf)
- return;
+ assert (buf);
while (buf) {
size_t buf_size = qd_buffer_size(buf);
// This will send the remaining data in the buffer if any.
int num_bytes_to_send = buf_size - (msg->cursor.cursor - qd_buffer_base(buf));
- if (num_bytes_to_send > 0)
+ if (num_bytes_to_send > 0) {
pn_link_send(pnl, (const char*)msg->cursor.cursor, num_bytes_to_send);
+ // TODO: DISPATCH-819 check pn_link_send return value
+ }
// If the entire message has already been received, taking out this lock is not that expensive
// because there is no contention for this lock.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6e10668/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 073ac7f..eab4ae0 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -68,6 +68,7 @@ typedef struct {
sys_mutex_t *lock;
sys_atomic_t ref_count; // The number of messages referencing this
qd_buffer_list_t buffers; // The buffer chain containing the message
+ qd_buffer_t *pending; // Buffer owned by and filled by qd_message_receive
qd_field_location_t section_message_header; // The message header list
qd_field_location_t section_delivery_annotation; // The delivery annotation map
qd_field_location_t section_message_annotation; // The message annotation map
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-dispatch git commit: DISPATCH-825: get rid of poorly
conceived EOS assert
Posted by ch...@apache.org.
DISPATCH-825: get rid of poorly conceived EOS assert
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fa2a4632
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fa2a4632
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fa2a4632
Branch: refs/heads/master
Commit: fa2a4632db9804f574af4e4763677f11ae50c1af
Parents: b6e1066
Author: Chuck Rolke <cr...@redhat.com>
Authored: Wed Sep 13 16:11:16 2017 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Wed Sep 13 16:11:16 2017 -0400
----------------------------------------------------------------------
src/message.c | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fa2a4632/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 97cb382..6074ebb 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1155,7 +1155,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
// Loop until msg is complete, error seen, or incoming bytes are consumed
- bool recv_error = false;
+ bool recv_error_or_eos = false;
while (1) {
//
// handle EOS and clean up after pn receive errors
@@ -1163,7 +1163,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
bool at_eos = (pn_delivery_partial(delivery) == false) &&
(pn_delivery_pending(delivery) == 0);
- if (at_eos || recv_error) {
+ if (at_eos || recv_error_or_eos) {
// Message is complete
sys_mutex_lock(msg->content->lock);
{
@@ -1217,11 +1217,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
(char*) qd_buffer_cursor(msg->content->pending),
qd_buffer_capacity(msg->content->pending));
- assert (rc != PN_EOS); // Just checked for this moments ago
-
if (rc < 0) {
- // error seen. next pass breaks out of loop
- recv_error = true;
+ // error or eos seen. next pass breaks out of loop
+ recv_error_or_eos = true;
} else if (rc > 0) {
//
// We have received a positive number of bytes for the message. Advance
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org