You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/09/06 12:32:51 UTC

[qpid-dispatch] 02/02: DISPATCH-1404: fix computation of message annotation field length

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 60f53bf4c121f0c39d77869b3d71be9d9cb5b31f
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Aug 28 11:39:58 2019 -0400

    DISPATCH-1404: fix computation of message annotation field length
---
 src/iterator.c                  |  9 ++---
 src/parse.c                     | 90 ++++++++++++++++++++++++++++++-----------
 tests/system_tests_multicast.py | 41 +++++++++++++++++++
 3 files changed, 112 insertions(+), 28 deletions(-)

diff --git a/src/iterator.c b/src/iterator.c
index 8d21d05..5b0eb36 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -900,9 +900,10 @@ bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, con
     if (!ptr)
         return false;
 
-    // if ptr->remaining holds enough bytes for the comparison then 
+    // if ptr->buffer holds enough bytes for the comparison then
     // don't fiddle with the iterator motions. Just do the comparison directly.
-    if (ptr->remaining >= skip + QD_MA_PREFIX_LEN) {
+    const int avail = qd_buffer_cursor(ptr->buffer) - ptr->cursor;
+    if (avail >= skip + QD_MA_PREFIX_LEN) {
         // there's enough in current buffer to do straight compare
         const void * blk1 = ptr->cursor + skip;
         const void * blk2 = prefix;
@@ -913,11 +914,10 @@ bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, con
     // this, too, could be optimized a bit
     qd_iterator_pointer_t lptr;
     *&lptr = *ptr;
-    
+
     iterator_pointer_move_cursor(&lptr, skip);
 
     unsigned char *c = (unsigned char*) prefix;
-
     while(*c && lptr.remaining) {
         unsigned char ic = *lptr.cursor;
 
@@ -926,7 +926,6 @@ bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, con
         c++;
 
         iterator_pointer_move_cursor(&lptr, 1);
-        lptr.remaining -= 1;
     }
 
     return *c == 0;
diff --git a/src/parse.c b/src/parse.c
index 35a6951..78afc20 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -701,6 +701,63 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k
 }
 
 
+// TODO(kgiusti) - de-duplicate all the buffer chain walking code!
+// See DISPATCH-1403
+//
+static inline int _turbo_advance(qd_iterator_pointer_t *ptr, int length)
+{
+    const int start = ptr->remaining;
+    int move = MIN(length, ptr->remaining);
+    while (move > 0) {
+        int avail = qd_buffer_cursor(ptr->buffer) - ptr->cursor;
+        if (move < avail) {
+            ptr->cursor += move;
+            ptr->remaining -= move;
+            break;
+        }
+        move -= avail;
+        ptr->remaining -= avail;
+        if (ptr->remaining == 0) {
+            ptr->cursor += avail;   // move to end
+            break;
+        }
+
+        // More remaining in buffer chain: advance to next buffer in chain
+        assert(DEQ_NEXT(ptr->buffer));
+        if (!DEQ_NEXT(ptr->buffer)) {
+            // this is an error!  ptr->remainer is not accurate.  This should not happen
+            // since the MA field must be completely received at this point
+            // (see DISPATCH-1394).
+            int copied = start - ptr->remaining;
+            ptr->remaining = 0;
+            ptr->cursor += avail;  // force to end of chain
+            return copied;
+        }
+        ptr->buffer = DEQ_NEXT(ptr->buffer);
+        ptr->cursor = qd_buffer_base(ptr->buffer);
+    }
+    return start - ptr->remaining;
+}
+
+
+// TODO(kgiusti): deduplicate!
+// See DISPATCH-1403
+//
+static inline int _turbo_copy(qd_iterator_pointer_t *ptr, char *buffer, int length)
+{
+    int move = MIN(length, ptr->remaining);
+    char * const start = buffer;
+    while (ptr->remaining && move > 0) {
+        int avail = MIN(move, qd_buffer_cursor(ptr->buffer) - ptr->cursor);
+        memcpy(buffer, ptr->cursor, avail);
+        buffer += avail;
+        move -= avail;
+        _turbo_advance(ptr, avail);
+    }
+    return (buffer - start);
+}
+
+
 const char *qd_parse_annotations_v1(
     bool                   strip_anno_in,
     qd_iterator_t         *ma_iter_in,
@@ -735,32 +792,19 @@ const char *qd_parse_annotations_v1(
         while (anno) {
             uint8_t * dp;                     // pointer to key name in raw buf or extract buf
             char key_name[QD_MA_MAX_KEY_LEN]; // key name extracted across buf boundary
+            int key_len = anno->size;
 
-            if (anno->bufptr.remaining >= anno->size + anno->length_of_size + 1) {
+            const int avail = qd_buffer_cursor(anno->bufptr.buffer) - anno->bufptr.cursor;
+            if (avail >= anno->size + anno->length_of_size + 1) {
                 // The best case: key name is completely in current raw buffer
                 dp = anno->bufptr.cursor + anno->length_of_size + 1;
             } else {
                 // Pull the key name from multiple buffers
                 qd_iterator_pointer_t wbuf = anno->bufptr;    // scratch buf pointers for getting key
-                uint8_t * wip = wbuf.cursor + anno->length_of_size + 1; // where to look in first buf
+                _turbo_advance(&wbuf, anno->length_of_size + 1);
                 int t_size = MIN(anno->size, QD_MA_MAX_KEY_LEN); // get this many total
-                int n_local = 0;                              // n copied so far. t_size is goal.
-                while (wbuf.buffer && n_local < t_size) {
-                    // copy current buf bytes in key_name buffer
-                    int n_needed = t_size - n_local;
-                    int n_to_copy = MIN(n_needed, wbuf.remaining);
-                    memmove(key_name + n_local, wip, n_to_copy);
-                    n_local += n_to_copy;
-                    
-                    if (n_local < t_size) {
-                        // move to next buffer
-                        wbuf.buffer = DEQ_NEXT(wbuf.buffer);
-                        if (wbuf.buffer) {
-                            wbuf.remaining = qd_buffer_size(wbuf.buffer);
-                            wip = qd_buffer_base(wbuf.buffer);
-                        }
-                    }
-                }
+                key_len = _turbo_copy(&wbuf, key_name, t_size);
+
                 dp = (uint8_t *)key_name;
             }
 
@@ -769,13 +813,13 @@ const char *qd_parse_annotations_v1(
             // stream then the remainder of the keys must be routing keys.
             // Padding keys are not real routing annotations but they have
             // the routing prefix.
-            assert(memcmp(QD_MA_PREFIX, dp, QMPL) == 0);
-            
+            assert(key_len >= QMPL && memcmp(QD_MA_PREFIX, dp, QMPL) == 0);
+
             // Advance pointer to data beyond the common prefix
             dp += QMPL;
-            
+
             qd_ma_enum_t ma_type = QD_MAE_NONE;
-            switch (anno->size) {
+            switch (key_len) {
                 case QD_MA_TO_LEN:
                     if (memcmp(QD_MA_TO + QMPL,      dp, QD_MA_TO_LEN - QMPL) == 0) {
                         ma_type = QD_MAE_TO;
diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py
index 2bab340..08bf0cb 100644
--- a/tests/system_tests_multicast.py
+++ b/tests/system_tests_multicast.py
@@ -386,6 +386,12 @@ class MulticastLinearTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_80_unsettled_3ack_message_annotations(self):
+        body = " MCAST UNSETTLED 3ACK LARGE MESSAGE ANNOTATIONS " + LARGE_PAYLOAD
+        test = MulticastUnsettled3AckMA(self.config, 10, body)
+        test.run()
+        self.assertEqual(None, test.error)
+
     def test_999_check_for_leaks(self):
         self._check_for_leaks()
 
@@ -847,5 +853,40 @@ class MulticastUnsettledRxFail(MulticastUnsettled3Ack):
         super(MulticastUnsettledRxFail, self).on_message(event)
 
 
+class MulticastUnsettled3AckMA(MulticastUnsettled3Ack):
+    """
+    Try 3 Ack, but with a bunch of user Message Annotations (why not?)
+    """
+    def __init__(self, config, count, body, outcomes=None):
+        super(MulticastUnsettled3AckMA, self).__init__(config,
+                                                       count,
+                                                       body,
+                                                       outcomes=None)
+        self._huge_ma = {
+            "my-key": "my-data",
+            "my-other-key": "my-other-data",
+            "my-map": { "my-map-key1": "X",
+                        "my-map-key2": 0x12,
+                        "my-map-key3": "+0123456789" * 101,
+                        "my-map-list": [i for i in range(97)]
+            },
+            "my-last-key": "so long, folks!"
+        }
+
+    def do_send(self, sender):
+        for i in range(self.msg_count):
+            msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body))
+            msg.annotations = self._huge_ma
+            dlv = sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        msg = event.message
+        if event.message.annotations != self._huge_ma:
+            self.error = "forwarded message annotations mismatch original"
+            self.done()
+            return
+        super(MulticastUnsettled3AckMA, self).on_message(event)
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org