You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2019/06/07 20:09:21 UTC
[qpid-dispatch] 01/02: DISPATCH-1354: Annotation processing
performance improvements
This is an automated email from the ASF dual-hosted git repository.
chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 91cd6285162c1edd49993741f627d96deb06a545
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Mon Jun 3 17:53:16 2019 -0400
DISPATCH-1354: Annotation processing performance improvements
Message annotation processing on received messages stages key names
byte by byte into a flat buffer and then uses strcmp to check them.
Easy improvements are:
* Use name in raw buffer if it does not cross a buffer boundary
* If name crosses a boundary then use memmoves to get the name in chunks
* Check the name prefix only once and then check variable parts of name strings
* Don't create unnecessary qd_iterators and qd_parsed_fields
* Don't check names whose lengths differ from the given keys
---
include/qpid/dispatch/amqp.h | 8 +++
include/qpid/dispatch/parse.h | 11 +++
src/iterator.c | 14 +++-
src/parse.c | 156 +++++++++++++++++++++++++++++-------------
4 files changed, 141 insertions(+), 48 deletions(-)
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 80447be..e49d20a 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -112,6 +112,14 @@ extern const char * const QD_MA_TRACE; ///< Trace
extern const char * const QD_MA_TO; ///< To-Override
extern const char * const QD_MA_PHASE; ///< Phase for override address
extern const char * const QD_MA_CLASS; ///< Message-Class
+
+#define QD_MA_PREFIX_LEN (9)
+#define QD_MA_INGRESS_LEN (16)
+#define QD_MA_TRACE_LEN (14)
+#define QD_MA_TO_LEN (11)
+#define QD_MA_PHASE_LEN (14)
+#define QD_MA_CLASS_LEN (14)
+
extern const int QD_MA_MAX_KEY_LEN; ///< strlen of longest key name
extern const int QD_MA_N_KEYS; ///< number of router annotation keys
extern const int QD_MA_FILTER_LEN; ///< size of annotation filter buffer
diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h
index 7be09f0..8397742 100644
--- a/include/qpid/dispatch/parse.h
+++ b/include/qpid/dispatch/parse.h
@@ -301,6 +301,17 @@ void qd_parse_annotations(
qd_iterator_pointer_t *blob_pointer,
uint32_t *blob_item_count);
+/**
+ * Identify which annotation is being parsed
+ */
+typedef enum {
+ QD_MAE_INGRESS,
+ QD_MAE_TRACE,
+ QD_MAE_TO,
+ QD_MAE_PHASE,
+ QD_MAE_NONE
+} qd_ma_enum_t;
+
///@}
#endif
diff --git a/src/iterator.c b/src/iterator.c
index bed686f..13f7f5d 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -20,6 +20,7 @@
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/amqp.h>
#include "message_private.h"
#include <stdio.h>
#include <string.h>
@@ -719,9 +720,20 @@ bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, con
if (!ptr)
return false;
+ // if ptr->remaining holds enough bytes for the comparison then
+ // don't fiddle with the iterator motions. Just do the comparison directly.
+ if (ptr->remaining >= skip + QD_MA_PREFIX_LEN) {
+ // there's enough in current buffer to do straight compare
+ const void * blk1 = ptr->cursor + skip;
+ const void * blk2 = prefix;
+ return memcmp(blk1, blk2, QD_MA_PREFIX_LEN) == 0;
+ }
+
+ // otherwise compare across buffer boundaries
+ // this, too, could be optimized a bit
qd_iterator_pointer_t lptr;
*&lptr = *ptr;
-
+
iterator_pointer_move_cursor(&lptr, skip);
unsigned char *c = (unsigned char*) prefix;
diff --git a/src/parse.c b/src/parse.c
index ecfbe0f..0672b0c 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -722,61 +722,123 @@ const char *qd_parse_annotations_v1(
return parse_error;
}
+ // define a shorthand name for the qd message annotation key prefix length
+#define QMPL QD_MA_PREFIX_LEN
+
+#define MIN(a,b) (((a)<(b))?(a):(b))
+
+ // trace, phase, and class keys are all the same length
+ assert(QD_MA_TRACE_LEN == QD_MA_PHASE_LEN);
+ assert(QD_MA_TRACE_LEN == QD_MA_CLASS_LEN);
+
qd_parsed_turbo_t *anno;
if (!strip_anno_in) {
anno = DEQ_HEAD(annos);
while (anno) {
- qd_iterator_t *key_iter =
- qd_iterator_buffer(anno->bufptr.buffer,
- anno->bufptr.cursor - qd_buffer_base(anno->bufptr.buffer),
- anno->size + anno->length_of_size,
- ITER_VIEW_ALL);
- assert(key_iter);
-
- qd_parsed_field_t *key_field = qd_parse(key_iter);
- assert(key_field);
-
- qd_iterator_t *iter = qd_parse_raw(key_field);
- assert(iter);
-
- qd_parsed_turbo_t *anno_val = DEQ_NEXT(anno);
- assert(anno_val);
-
- qd_iterator_t *val_iter =
- qd_iterator_buffer(anno_val->bufptr.buffer,
- anno_val->bufptr.cursor - qd_buffer_base(anno_val->bufptr.buffer),
- anno_val->size + anno_val->length_of_size,
- ITER_VIEW_ALL);
- assert(val_iter);
-
- qd_parsed_field_t *val_field = qd_parse(val_iter);
- assert(val_field);
-
- // Hoist the key name out of the buffers into a normal char array
- char key_name[QD_MA_MAX_KEY_LEN + 1];
- (void)qd_iterator_strncpy(iter, key_name, QD_MA_MAX_KEY_LEN + 1);
-
- // transfer ownership of the extracted value to the message
- if (!strcmp(key_name, QD_MA_TRACE)) {
- *ma_trace = val_field;
- } else if (!strcmp(key_name, QD_MA_INGRESS)) {
- *ma_ingress = val_field;
- } else if (!strcmp(key_name, QD_MA_TO)) {
- *ma_to_override = val_field;
- } else if (!strcmp(key_name, QD_MA_PHASE)) {
- *ma_phase = val_field;
+ uint8_t * dp; // pointer to key name in raw buf or extract buf
+ char key_name[QD_MA_MAX_KEY_LEN]; // key name extracted across buf boundary
+
+ if (anno->bufptr.remaining >= anno->size + anno->length_of_size + 1) {
+ // The best case: key name is completely in current raw buffer
+ dp = anno->bufptr.cursor + anno->length_of_size + 1;
} else {
- // TODO: this key had the QD_MA_PREFIX but it does not match
- // one of the actual fields.
- qd_parse_free(val_field);
+ // Pull the key name from multiple buffers
+ qd_iterator_pointer_t wbuf = anno->bufptr; // scratch buf pointers for getting key
+ uint8_t * wip = wbuf.cursor + anno->length_of_size + 1; // where to look in first buf
+ int t_size = MIN(anno->size, QD_MA_MAX_KEY_LEN); // get this many total
+ int n_local = 0; // n copied so far. t_size is goal.
+ while (wbuf.buffer && n_local < t_size) {
+ // copy current buf bytes in key_name buffer
+ int n_needed = t_size - n_local;
+ int n_to_copy = MIN(n_needed, wbuf.remaining);
+ memmove(key_name + n_local, wip, n_to_copy);
+ n_local += n_to_copy;
+
+ if (n_local < t_size) {
+ // move to next buffer
+ wbuf.buffer = DEQ_NEXT(wbuf.buffer);
+ if (wbuf.buffer) {
+ wbuf.remaining = qd_buffer_size(wbuf.buffer);
+ wip = qd_buffer_base(wbuf.buffer);
+ }
+ }
+ }
+ dp = (uint8_t *)key_name;
}
- qd_iterator_free(key_iter);
- qd_parse_free(key_field);
- qd_iterator_free(val_iter);
- // val_field is usually handed over to message_private and is freed
+ // Verify that the key starts with the prefix.
+ // Once a key with the routing prefix is observed in the annotation
+ // stream then the remainder of the keys must be routing keys.
+ // Padding keys are not real routing annotations but they have
+ // the routing prefix.
+ assert(memcmp(QD_MA_PREFIX, dp, QMPL) == 0);
+
+ // Advance pointer to data beyond the common prefix
+ dp += QMPL;
+
+ qd_ma_enum_t ma_type = QD_MAE_NONE;
+ switch (anno->size) {
+ case QD_MA_TO_LEN:
+ if (memcmp(QD_MA_TO + QMPL, dp, QD_MA_TO_LEN - QMPL) == 0) {
+ ma_type = QD_MAE_TO;
+ }
+ break;
+ case QD_MA_TRACE_LEN:
+ if (memcmp(QD_MA_TRACE + QMPL, dp, QD_MA_TRACE_LEN - QMPL) == 0) {
+ ma_type = QD_MAE_TRACE;
+ } else
+ if (memcmp(QD_MA_PHASE + QMPL, dp, QD_MA_PHASE_LEN - QMPL) == 0) {
+ ma_type = QD_MAE_PHASE;
+ }
+ break;
+ case QD_MA_INGRESS_LEN:
+ if (memcmp(QD_MA_INGRESS + QMPL, dp, QD_MA_INGRESS_LEN - QMPL) == 0) {
+ ma_type = QD_MAE_INGRESS;
+ }
+ break;
+ default:
+ // padding annotations are ignored here
+ break;
+ }
- anno = DEQ_NEXT(anno_val);
+ // Process the data field
+ anno = DEQ_NEXT(anno);
+ assert(anno);
+
+ if (ma_type != QD_MAE_NONE) {
+ // produce a parsed_field for the data
+ qd_iterator_t *val_iter =
+ qd_iterator_buffer(anno->bufptr.buffer,
+ anno->bufptr.cursor - qd_buffer_base(anno->bufptr.buffer),
+ anno->size + anno->length_of_size,
+ ITER_VIEW_ALL);
+ assert(val_iter);
+
+ qd_parsed_field_t *val_field = qd_parse(val_iter);
+ assert(val_field);
+
+ // transfer ownership of the extracted value to the message
+ switch (ma_type) {
+ case QD_MAE_INGRESS:
+ *ma_ingress = val_field;
+ break;
+ case QD_MAE_TRACE:
+ *ma_trace = val_field;
+ break;
+ case QD_MAE_TO:
+ *ma_to_override = val_field;
+ break;
+ case QD_MAE_PHASE:
+ *ma_phase = val_field;
+ break;
+ case QD_MAE_NONE:
+ assert(false);
+ break;
+ }
+
+ qd_iterator_free(val_iter);
+ }
+ anno = DEQ_NEXT(anno);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org