You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/09/06 12:32:51 UTC
[qpid-dispatch] 02/02: DISPATCH-1404: fix computation of message
annotation field length
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 60f53bf4c121f0c39d77869b3d71be9d9cb5b31f
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Aug 28 11:39:58 2019 -0400
DISPATCH-1404: fix computation of message annotation field length
---
src/iterator.c | 9 ++---
src/parse.c | 90 ++++++++++++++++++++++++++++++-----------
tests/system_tests_multicast.py | 41 +++++++++++++++++++
3 files changed, 112 insertions(+), 28 deletions(-)
diff --git a/src/iterator.c b/src/iterator.c
index 8d21d05..5b0eb36 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -900,9 +900,10 @@ bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, con
if (!ptr)
return false;
- // if ptr->remaining holds enough bytes for the comparison then
+ // if ptr->buffer holds enough bytes for the comparison then
// don't fiddle with the iterator motions. Just do the comparison directly.
- if (ptr->remaining >= skip + QD_MA_PREFIX_LEN) {
+ const int avail = qd_buffer_cursor(ptr->buffer) - ptr->cursor;
+ if (avail >= skip + QD_MA_PREFIX_LEN) {
// there's enough in current buffer to do straight compare
const void * blk1 = ptr->cursor + skip;
const void * blk2 = prefix;
@@ -913,11 +914,10 @@ bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, con
// this, too, could be optimized a bit
qd_iterator_pointer_t lptr;
*&lptr = *ptr;
-
+
iterator_pointer_move_cursor(&lptr, skip);
unsigned char *c = (unsigned char*) prefix;
-
while(*c && lptr.remaining) {
unsigned char ic = *lptr.cursor;
@@ -926,7 +926,6 @@ bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, con
c++;
iterator_pointer_move_cursor(&lptr, 1);
- lptr.remaining -= 1;
}
return *c == 0;
diff --git a/src/parse.c b/src/parse.c
index 35a6951..78afc20 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -701,6 +701,63 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k
}
+// TODO(kgiusti) - de-duplicate all the buffer chain walking code!
+// See DISPATCH-1403
+//
+static inline int _turbo_advance(qd_iterator_pointer_t *ptr, int length)
+{
+ const int start = ptr->remaining;
+ int move = MIN(length, ptr->remaining);
+ while (move > 0) {
+ int avail = qd_buffer_cursor(ptr->buffer) - ptr->cursor;
+ if (move < avail) {
+ ptr->cursor += move;
+ ptr->remaining -= move;
+ break;
+ }
+ move -= avail;
+ ptr->remaining -= avail;
+ if (ptr->remaining == 0) {
+ ptr->cursor += avail; // move to end
+ break;
+ }
+
+ // More remaining in buffer chain: advance to next buffer in chain
+ assert(DEQ_NEXT(ptr->buffer));
+ if (!DEQ_NEXT(ptr->buffer)) {
+ // this is an error! ptr->remainer is not accurate. This should not happen
+ // since the MA field must be completely received at this point
+ // (see DISPATCH-1394).
+ int copied = start - ptr->remaining;
+ ptr->remaining = 0;
+ ptr->cursor += avail; // force to end of chain
+ return copied;
+ }
+ ptr->buffer = DEQ_NEXT(ptr->buffer);
+ ptr->cursor = qd_buffer_base(ptr->buffer);
+ }
+ return start - ptr->remaining;
+}
+
+
+// TODO(kgiusti): deduplicate!
+// See DISPATCH-1403
+//
+static inline int _turbo_copy(qd_iterator_pointer_t *ptr, char *buffer, int length)
+{
+ int move = MIN(length, ptr->remaining);
+ char * const start = buffer;
+ while (ptr->remaining && move > 0) {
+ int avail = MIN(move, qd_buffer_cursor(ptr->buffer) - ptr->cursor);
+ memcpy(buffer, ptr->cursor, avail);
+ buffer += avail;
+ move -= avail;
+ _turbo_advance(ptr, avail);
+ }
+ return (buffer - start);
+}
+
+
const char *qd_parse_annotations_v1(
bool strip_anno_in,
qd_iterator_t *ma_iter_in,
@@ -735,32 +792,19 @@ const char *qd_parse_annotations_v1(
while (anno) {
uint8_t * dp; // pointer to key name in raw buf or extract buf
char key_name[QD_MA_MAX_KEY_LEN]; // key name extracted across buf boundary
+ int key_len = anno->size;
- if (anno->bufptr.remaining >= anno->size + anno->length_of_size + 1) {
+ const int avail = qd_buffer_cursor(anno->bufptr.buffer) - anno->bufptr.cursor;
+ if (avail >= anno->size + anno->length_of_size + 1) {
// The best case: key name is completely in current raw buffer
dp = anno->bufptr.cursor + anno->length_of_size + 1;
} else {
// Pull the key name from multiple buffers
qd_iterator_pointer_t wbuf = anno->bufptr; // scratch buf pointers for getting key
- uint8_t * wip = wbuf.cursor + anno->length_of_size + 1; // where to look in first buf
+ _turbo_advance(&wbuf, anno->length_of_size + 1);
int t_size = MIN(anno->size, QD_MA_MAX_KEY_LEN); // get this many total
- int n_local = 0; // n copied so far. t_size is goal.
- while (wbuf.buffer && n_local < t_size) {
- // copy current buf bytes in key_name buffer
- int n_needed = t_size - n_local;
- int n_to_copy = MIN(n_needed, wbuf.remaining);
- memmove(key_name + n_local, wip, n_to_copy);
- n_local += n_to_copy;
-
- if (n_local < t_size) {
- // move to next buffer
- wbuf.buffer = DEQ_NEXT(wbuf.buffer);
- if (wbuf.buffer) {
- wbuf.remaining = qd_buffer_size(wbuf.buffer);
- wip = qd_buffer_base(wbuf.buffer);
- }
- }
- }
+ key_len = _turbo_copy(&wbuf, key_name, t_size);
+
dp = (uint8_t *)key_name;
}
@@ -769,13 +813,13 @@ const char *qd_parse_annotations_v1(
// stream then the remainder of the keys must be routing keys.
// Padding keys are not real routing annotations but they have
// the routing prefix.
- assert(memcmp(QD_MA_PREFIX, dp, QMPL) == 0);
-
+ assert(key_len >= QMPL && memcmp(QD_MA_PREFIX, dp, QMPL) == 0);
+
// Advance pointer to data beyond the common prefix
dp += QMPL;
-
+
qd_ma_enum_t ma_type = QD_MAE_NONE;
- switch (anno->size) {
+ switch (key_len) {
case QD_MA_TO_LEN:
if (memcmp(QD_MA_TO + QMPL, dp, QD_MA_TO_LEN - QMPL) == 0) {
ma_type = QD_MAE_TO;
diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py
index 2bab340..08bf0cb 100644
--- a/tests/system_tests_multicast.py
+++ b/tests/system_tests_multicast.py
@@ -386,6 +386,12 @@ class MulticastLinearTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_80_unsettled_3ack_message_annotations(self):
+ body = " MCAST UNSETTLED 3ACK LARGE MESSAGE ANNOTATIONS " + LARGE_PAYLOAD
+ test = MulticastUnsettled3AckMA(self.config, 10, body)
+ test.run()
+ self.assertEqual(None, test.error)
+
def test_999_check_for_leaks(self):
self._check_for_leaks()
@@ -847,5 +853,40 @@ class MulticastUnsettledRxFail(MulticastUnsettled3Ack):
super(MulticastUnsettledRxFail, self).on_message(event)
+class MulticastUnsettled3AckMA(MulticastUnsettled3Ack):
+ """
+ Try 3 Ack, but with a bunch of user Message Annotations (why not?)
+ """
+ def __init__(self, config, count, body, outcomes=None):
+ super(MulticastUnsettled3AckMA, self).__init__(config,
+ count,
+ body,
+ outcomes=None)
+ self._huge_ma = {
+ "my-key": "my-data",
+ "my-other-key": "my-other-data",
+ "my-map": { "my-map-key1": "X",
+ "my-map-key2": 0x12,
+ "my-map-key3": "+0123456789" * 101,
+ "my-map-list": [i for i in range(97)]
+ },
+ "my-last-key": "so long, folks!"
+ }
+
+ def do_send(self, sender):
+ for i in range(self.msg_count):
+ msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body))
+ msg.annotations = self._huge_ma
+ dlv = sender.send(msg)
+ self.n_sent += 1
+
+ def on_message(self, event):
+ msg = event.message
+ if event.message.annotations != self._huge_ma:
+ self.error = "forwarded message annotations mismatch original"
+ self.done()
+ return
+ super(MulticastUnsettled3AckMA, self).on_message(event)
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org