You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by ganeshmurthy <gi...@git.apache.org> on 2017/07/06 00:36:13 UTC

[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

GitHub user ganeshmurthy opened a pull request:

    https://github.com/apache/qpid-dispatch/pull/172

    DISPATCH-767 - Added code to support multi frame message streaming

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ganeshmurthy/qpid-dispatch DISPATCH-767-1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/qpid-dispatch/pull/172.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #172
    
----
commit 7dfbed4355c8e626a441b70c3429f792ec712729
Author: Ganesh Murthy <gm...@redhat.com>
Date:   2017-07-05T15:51:06Z

    DISPATCH-767 - Added code to support multi frame message streaming

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126155845
  
    --- Diff: include/qpid/dispatch/message.h ---
    @@ -203,9 +203,9 @@ int  qd_message_get_phase_annotation(const qd_message_t *msg);
     void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t *ingress_field);
     
     /**
    - * Receive message data via a delivery.  This function may be called more than once on the same
    - * delivery if the message spans multiple frames.  Once a complete message has been received, this
    - * function shall return the message.
    + * Receive message data frame by frame via a delivery.  This function may be called more than once on the same
    + * delivery if the message spans multiple frames. Always returns a message. The message buffers are filled up to the point with the data that was been received so far.
    + * The buffer keeps filling up on successive calls to this function.
      *
      * @param delivery An incoming delivery from a link
      * @return A pointer to the complete message or 0 if the message is not yet complete.
    --- End diff --
    
    This comment is out of date


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126172968
  
    --- Diff: src/router_core/forwarder.c ---
    @@ -395,9 +415,22 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
         // Forward to an in-process subscriber if there is one.
         //
         if (!exclude_inprocess) {
    +        bool receive_complete = qd_message_receive_complete(msg);
             qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
             if (sub) {
    -            qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg);
    +
    +            //
    +            // Only if the message has been completely received, forward it.
    +            // Subscriptions, at the moment, dont have the ability to deal with partial messages
    +            //
    +            if (receive_complete)
    --- End diff --
    
    For streaming we need to start forwarding before completion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126160704
  
    --- Diff: src/message.c ---
    @@ -975,11 +990,94 @@ void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t
         qd_compose_free(ingress_field);
     }
     
    +bool qd_message_is_discard(qd_message_t *msg)
    +{
    +    if (!msg)
    +        return false;
    +    qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg;
    +    return pvt_msg->content->discard;
    +}
    +
    +void qd_message_set_discard(qd_message_t *msg, bool discard)
    +{
    +    if (!msg)
    +        return;
    +
    +    qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg;
    +    pvt_msg->content->discard = discard;
    +}
    +
    +size_t qd_message_fanout(qd_message_t *in_msg)
    +{
    +    if (!in_msg)
    +        return 0;
    +    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
    +    return msg->content->fanout;
    +}
    +
    +void qd_message_add_fanout(qd_message_t *in_msg)
    +{
    +    assert(in_msg);
    +    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
    +    msg->content->fanout++;
    +}
    +
    +bool qd_message_receive_complete(qd_message_t *in_msg)
    +{
    +    if (!in_msg)
    +        return false;
    +    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
    +    return msg->content->receive_complete;
    +}
    +
    +bool qd_message_send_complete(qd_message_t *in_msg)
    +{
    +    if (!in_msg)
    +        return false;
    +
    +    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
    +    return msg->send_complete;
    +}
    +
    +bool qd_message_tag_sent(qd_message_t *in_msg)
    +{
    +    if (!in_msg)
    +        return false;
    +
    +    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
    +    return msg->tag_sent;
    +}
    +
    +void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent)
    +{
    +    if (!in_msg)
    +        return;
    +
    +    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
    +    msg->tag_sent = tag_sent;
    +}
    +
    +qd_buffer_t *qd_message_cursor_buffer(qd_message_pvt_t *in_msg)
    +{
    +    return in_msg->cursor.buffer;
    +}
    +
    +int qd_message_cursor_offset(qd_message_pvt_t *in_msg)
    +{
    +    return in_msg->cursor.offset;
    +}
    +
    --- End diff --
    
    Delete qd_message_cursor_buffer|offset - you don't need them with qd_message_cursor()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126156980
  
    --- Diff: src/buffer.c ---
    @@ -83,7 +86,28 @@ size_t qd_buffer_size(qd_buffer_t *buf)
     void qd_buffer_insert(qd_buffer_t *buf, size_t len)
     {
         buf->size += len;
    -    assert(buf->size <= buffer_size);
    +    assert(buf->size <= BUFFER_SIZE);
    +}
    +
    +void qd_buffer_add_fanout(qd_buffer_t *buf)
    +{
    +    buf->fanout++;
    +}
    +
    +size_t qd_buffer_fanout(qd_buffer_t *buf)
    +{
    +    return buf->fanout;
    +}
    +
    +
    +unsigned char *qd_buffer_at(qd_buffer_t *buf, size_t len)
    +{
    +    assert(len >=0);
    --- End diff --
    
    Redundant, size_t is unsigned, it can never be < 0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126158278
  
    --- Diff: src/message_private.h ---
    @@ -94,10 +94,19 @@ typedef struct {
         unsigned char       *parse_cursor;
         qd_message_depth_t   parse_depth;
         qd_parsed_field_t   *parsed_message_annotations;
    +
    +    bool                 discard;                        // Should this message be discarded?
    +    bool                 receive_complete;               // true if the message has been completely received, false otherwise
    +    unsigned int         fanout;                         // The number of receivers for this message. This number does not include in-process subscribers.
     } qd_message_content_t;
     
     typedef struct {
         DEQ_LINKS(qd_message_t);   // Deque linkage that overlays the qd_message_t
    +    qd_field_location_t   cursor;           // A pointer to the current location of the outgoing byte stream.
    +    qd_message_depth_t    message_depth;
    --- End diff --
    
    Add a comment to explain these here - not obvious (to me) what they mean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126163415
  
    --- Diff: src/message.c ---
    @@ -996,21 +1094,47 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         }
     
         //
    +    // The discard flag indicates if we should continue receiving the message.
    +    // This is pertinent in the case of large messages. When large messages are being received, we try to send out part of the
    +    // message that has been received so far. If we not able to send it anywhere, there is no need to keep creating buffers
    +    //
    +    bool discard = qd_message_is_discard((qd_message_t*)msg);
    +
    +    //
         // Get a reference to the tail buffer on the message.  This is the buffer into which
    -    // we will store incoming message data.  If there is no buffer in the message, allocate
    -    // an empty one and add it to the message.
    +    // we will store incoming message data.  If there is no buffer in the message, this is the
    +    // first time we are here and we need to allocate an empty one and add it to the message.
         //
    -    buf = DEQ_TAIL(msg->content->buffers);
    -    if (!buf) {
    -        buf = qd_buffer();
    -        DEQ_INSERT_TAIL(msg->content->buffers, buf);
    +    if (!discard) {
    +        buf = DEQ_TAIL(msg->content->buffers);
    +        if (!buf) {
    +            buf = qd_buffer();
    +            DEQ_INSERT_TAIL(msg->content->buffers, buf);
    +        }
         }
     
         while (1) {
    -        //
    -        // Try to receive enough data to fill the remaining space in the tail buffer.
    -        //
    -        rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf));
    +        if (discard) {
    +            char dummy[BUFFER_SIZE];
    +            rc = pn_link_recv(link, dummy, BUFFER_SIZE);
    +        }
    +        else {
    +
    +            //
    +            // Make sure our buffer chain length is always less than MAX_BUFFER_LENGTH. We don't want to add any more buffers beyond MAX_BUFFER_LENGTH.
    +            //
    +            //
    +            //sys_mutex_lock(msg->content->lock);
    +            //if (DEQ_SIZE(msg->content->buffers) > MAX_BUFFER_CHAIN_LENGTH) {
    +            //    return (qd_message_t*) msg;
    +            //}
    +            //sys_mutex_unlock(msg->content->lock);
    +
    +            //
    +            // Try to receive enough data to fill the remaining space in the tail buffer.
    +            //
    +            rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf));
    +        }
     
             //
             // If we receive PN_EOS, we have come to the end of the message.
    --- End diff --
    
    FYI: instead of watching for PN_EOS, you can also check for (pn_delivery_pending(d) == 0 && !pn_delivery_partial(d)). They are equivalent, PN_EOS is correct - sometimes the pending/partial test is more convenient because it stays true on the delivery so you can check it any time, not just at return from pn_link_recv(). Only FYI, no need to change this code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126168949
  
    --- Diff: src/message.c ---
    @@ -1151,89 +1287,140 @@ void qd_message_send(qd_message_t *in_msg,
     {
         qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
         qd_message_content_t *content = msg->content;
    -    qd_buffer_t          *buf     = DEQ_HEAD(content->buffers);
    -    unsigned char        *cursor;
    +    qd_buffer_t          *buf     = 0;
         pn_link_t            *pnl     = qd_link_pn(link);
     
    -    qd_buffer_list_t new_ma;
    -    DEQ_INIT(new_ma);
    +    // How many receivers does this message have?
    +    int                  fanout   = qd_message_fanout(in_msg);
     
    -    // Process  the message annotations if any
    -    compose_message_annotations(msg, &new_ma, strip_annotations);
    +    if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
    +        //
    +        // Start with the very first buffer;
    +        //
    +        buf = DEQ_HEAD(content->buffers);
    --- End diff --
    
    No lock here? What's the protocol for protecting buffers between send/receive threads? We can't let threads work concurrently on the same buffer (in particular change the buffer size) We could lock the use of buffers, not just the enqueue/deque, or there are some other options.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126164203
  
    --- Diff: src/message.c ---
    @@ -1026,16 +1150,24 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
                 // will only happen if the size of the message content is an exact multiple
                 // of the buffer size.
                 //
    -
    -            if (qd_buffer_size(buf) == 0) {
    +            if (buf && qd_buffer_size(buf) == 0) {
    +                sys_mutex_lock(msg->content->lock);
                     DEQ_REMOVE_TAIL(msg->content->buffers);
    +                sys_mutex_unlock(msg->content->lock);
                     qd_buffer_free(buf);
                 }
     
    +            //
    +            // We have received the entire message since rc == PN_EOS, set the receive_complete flag to false
    --- End diff --
    
    Comment typo: says "false" should say "true"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126170901
  
    --- Diff: src/message.c ---
    @@ -1151,89 +1287,140 @@ void qd_message_send(qd_message_t *in_msg,
     {
         qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
         qd_message_content_t *content = msg->content;
    -    qd_buffer_t          *buf     = DEQ_HEAD(content->buffers);
    -    unsigned char        *cursor;
    +    qd_buffer_t          *buf     = 0;
         pn_link_t            *pnl     = qd_link_pn(link);
     
    -    qd_buffer_list_t new_ma;
    -    DEQ_INIT(new_ma);
    +    // How many receivers does this message have?
    +    int                  fanout   = qd_message_fanout(in_msg);
     
    -    // Process  the message annotations if any
    -    compose_message_annotations(msg, &new_ma, strip_annotations);
    +    if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
    +        //
    +        // Start with the very first buffer;
    +        //
    +        buf = DEQ_HEAD(content->buffers);
     
    -    //
    -    // This is the case where the message annotations have been modified.
    -    // The message send must be divided into sections:  The existing header;
    -    // the new message annotations; the rest of the existing message.
    -    // Note that the original message annotations that are still in the
    -    // buffer chain must not be sent.
    -    //
    -    // Start by making sure that we've parsed the message sections through
    -    // the message annotations
    -    //
    -    // ??? NO LONGER NECESSARY???
    -    if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
    -        qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
    -        return;
    -    }
    +        if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
    +            qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
    +            return;
    +        }
     
    -    //
    -    // Send header if present
    -    //
    -    cursor = qd_buffer_base(buf);
    -    if (content->section_message_header.length > 0) {
    -        buf    = content->section_message_header.buffer;
    -        cursor = content->section_message_header.offset + qd_buffer_base(buf);
    -        advance(&cursor, &buf,
    -                content->section_message_header.length + content->section_message_header.hdr_length,
    -                send_handler, (void*) pnl);
    -    }
    +        //
    +        // Send header if present
    +        //
    +        unsigned char *cursor = qd_buffer_base(buf);
    +        int header_consume = content->section_message_header.length + content->section_message_header.hdr_length;
    +        if (content->section_message_header.length > 0) {
    +            buf    = content->section_message_header.buffer;
    +            cursor = content->section_message_header.offset + qd_buffer_base(buf);
    +            advance(&cursor, &buf, header_consume, send_handler, (void*) pnl);
    +        }
     
    -    //
    -    // Send delivery annotation if present
    -    //
    -    if (content->section_delivery_annotation.length > 0) {
    -        buf    = content->section_delivery_annotation.buffer;
    -        cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf);
    -        advance(&cursor, &buf,
    -                content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length,
    -                send_handler, (void*) pnl);
    -    }
    +        //
    +        // Send delivery annotation if present
    +        //
    +        int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length;
    +        if (content->section_delivery_annotation.length > 0) {
    +            buf    = content->section_delivery_annotation.buffer;
    +            cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf);
    +            advance(&cursor, &buf, da_consume, send_handler, (void*) pnl);
    +        }
     
    -    //
    -    // Send new message annotations
    -    //
    -    qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
    -    while (da_buf) {
    -        char *to_send = (char*) qd_buffer_base(da_buf);
    -        pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
    -        da_buf = DEQ_NEXT(da_buf);
    -    }
    -    qd_buffer_list_free_buffers(&new_ma);
    +        qd_buffer_list_t new_ma;
    +        DEQ_INIT(new_ma);
     
    -    //
    -    // Skip over replaced message annotations
    -    //
    -    if (content->section_message_annotation.length > 0)
    -        advance(&cursor, &buf,
    -                content->section_message_annotation.hdr_length + content->section_message_annotation.length,
    -                0, 0);
    +        // Process  the message annotations if any
    +        compose_message_annotations(msg, &new_ma, strip_annotations);
    +
    +        //
    +        // Send new message annotations
    +        //
    +        qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
    +        while (da_buf) {
    +            char *to_send = (char*) qd_buffer_base(da_buf);
    +            pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
    +            da_buf = DEQ_NEXT(da_buf);
    +        }
    +        qd_buffer_list_free_buffers(&new_ma);
    +
    +        //
    +        // Skip over replaced message annotations
    +        //
    +        int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length;
    +        if (content->section_message_annotation.length > 0)
    +            advance(&cursor, &buf, ma_consume, 0, 0);
    +
    +        msg->cursor.buffer = buf;
    +
    +        //
    +        // If this message has no header and no delivery annotations and no message annotations, set the offset to 0.
    +        //
    +        if (header_consume == 0 && da_consume == 0 && ma_consume ==0)
    +            msg->cursor.offset = 0;
    +        else
    +            msg->cursor.offset = cursor - qd_buffer_base(buf);
    +
    +        msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS;
     
    -    //
    -    // Send remaining partial buffer
    -    //
    -    if (buf) {
    -        size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
    -        advance(&cursor, &buf, len, send_handler, (void*) pnl);
         }
     
    -    // Fall through to process the remaining buffers normally
    -    // Note that 'advance' will have moved us to the next buffer in the chain.
    +    buf = msg->cursor.buffer;
     
    +    if (!buf)
    +        return;
    +
    +    bool receive_complete = qd_message_receive_complete(in_msg);
     
         while (buf) {
    -        pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf));
    -        buf = DEQ_NEXT(buf);
    +        size_t buf_size = qd_buffer_size(buf);
    +
    +        // This will send the remaining data in the buffer if any.
    +        pn_link_send(pnl, (char*) qd_buffer_at(buf, msg->cursor.offset), buf_size - msg->cursor.offset);
    +
    +        // If the entire message has been received,  there is no need to lock before sending because no one else is
    +        // trying to modify the data structure.
    +        if (!receive_complete)
    +            sys_mutex_lock(msg->content->lock);
    --- End diff --
    
    I wouldn't bother with the conditional lock  - if there's no contention the cost of the lock is small, and the condition introduces one more way for a future programmer to screw up the thread safety logic by mistake.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126155671
  
    --- Diff: include/qpid/dispatch/buffer.h ---
    @@ -32,10 +32,14 @@ typedef struct qd_buffer_t qd_buffer_t;
     
     DEQ_DECLARE(qd_buffer_t, qd_buffer_list_t);
     
    +extern size_t BUFFER_SIZE;
    +extern size_t MAX_BUFFER_LENGTH;
    +
     /** A raw byte buffer .*/
     struct qd_buffer_t {
         DEQ_LINKS(qd_buffer_t);
         unsigned int size;          ///< Size of data content
    +    unsigned int fanout;        // The number of receivers for this buffer
    --- End diff --
    
    Q: Why record fanout per-buffer? Fanout is per-message and buffers can be re-used with different fanouts in different messages. Can we avoid duplicating the fanout value from the message onto all its buffers? Maybe not - just wondering, it seems out of place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126161795
  
    --- Diff: src/message.c ---
    @@ -996,21 +1094,47 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         }
     
         //
    +    // The discard flag indicates if we should continue receiving the message.
    +    // This is pertinent in the case of large messages. When large messages are being received, we try to send out part of the
    +    // message that has been received so far. If we not able to send it anywhere, there is no need to keep creating buffers
    +    //
    +    bool discard = qd_message_is_discard((qd_message_t*)msg);
    +
    +    //
         // Get a reference to the tail buffer on the message.  This is the buffer into which
    -    // we will store incoming message data.  If there is no buffer in the message, allocate
    -    // an empty one and add it to the message.
    +    // we will store incoming message data.  If there is no buffer in the message, this is the
    +    // first time we are here and we need to allocate an empty one and add it to the message.
         //
    -    buf = DEQ_TAIL(msg->content->buffers);
    -    if (!buf) {
    -        buf = qd_buffer();
    -        DEQ_INSERT_TAIL(msg->content->buffers, buf);
    +    if (!discard) {
    +        buf = DEQ_TAIL(msg->content->buffers);
    +        if (!buf) {
    +            buf = qd_buffer();
    +            DEQ_INSERT_TAIL(msg->content->buffers, buf);
    +        }
         }
     
         while (1) {
    -        //
    -        // Try to receive enough data to fill the remaining space in the tail buffer.
    -        //
    -        rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf));
    +        if (discard) {
    +            char dummy[BUFFER_SIZE];
    +            rc = pn_link_recv(link, dummy, BUFFER_SIZE);
    +        }
    +        else {
    +
    +            //
    --- End diff --
    
    Remove commented-out code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by alanconway <gi...@git.apache.org>.
Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126161603
  
    --- Diff: src/message.c ---
    @@ -996,21 +1094,47 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         }
     
         //
    +    // The discard flag indicates if we should continue receiving the message.
    +    // This is pertinent in the case of large messages. When large messages are being received, we try to send out part of the
    +    // message that has been received so far. If we not able to send it anywhere, there is no need to keep creating buffers
    +    //
    +    bool discard = qd_message_is_discard((qd_message_t*)msg);
    +
    +    //
         // Get a reference to the tail buffer on the message.  This is the buffer into which
    -    // we will store incoming message data.  If there is no buffer in the message, allocate
    -    // an empty one and add it to the message.
    +    // we will store incoming message data.  If there is no buffer in the message, this is the
    +    // first time we are here and we need to allocate an empty one and add it to the message.
         //
    -    buf = DEQ_TAIL(msg->content->buffers);
    -    if (!buf) {
    -        buf = qd_buffer();
    -        DEQ_INSERT_TAIL(msg->content->buffers, buf);
    +    if (!discard) {
    +        buf = DEQ_TAIL(msg->content->buffers);
    +        if (!buf) {
    +            buf = qd_buffer();
    +            DEQ_INSERT_TAIL(msg->content->buffers, buf);
    +        }
         }
     
         while (1) {
    -        //
    -        // Try to receive enough data to fill the remaining space in the tail buffer.
    -        //
    -        rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf));
    +        if (discard) {
    +            char dummy[BUFFER_SIZE];
    --- End diff --
    
    Pity that we have to copy out data only to throw it away, but I think you are correct that we do need to do it. Maybe the AMQP spec has something we can use to abort an incoming message, but even if it does this code path is probably needed as a fall-back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #172: DISPATCH-767 - Added code to support multi ...

Posted by ganeshmurthy <gi...@git.apache.org>.
Github user ganeshmurthy closed the pull request at:

    https://github.com/apache/qpid-dispatch/pull/172


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org