You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2019/06/07 20:09:21 UTC

[qpid-dispatch] 01/02: DISPATCH-1354: Annotation processing performance improvements

This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 91cd6285162c1edd49993741f627d96deb06a545
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Mon Jun 3 17:53:16 2019 -0400

    DISPATCH-1354: Annotation processing performance improvements
    
    Message annotation processing on received messages stages key names
    byte by byte into a flat buffer and then uses strcmp to check them.
    
    Easy improvements are:
    
     * Use name in raw buffer if it does not cross a buffer boundary
     * If name crosses a boundary then use memmoves to get the name in chunks
     * Check the name prefix only once and then check variable parts of name strings
     * Don't create unnecessary qd_iterators and qd_parsed_fields
     * Don't check names whose lengths differ from the given keys
---
 include/qpid/dispatch/amqp.h  |   8 +++
 include/qpid/dispatch/parse.h |  11 +++
 src/iterator.c                |  14 +++-
 src/parse.c                   | 156 +++++++++++++++++++++++++++++-------------
 4 files changed, 141 insertions(+), 48 deletions(-)

diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 80447be..e49d20a 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -112,6 +112,14 @@ extern const char * const QD_MA_TRACE;    ///< Trace
 extern const char * const QD_MA_TO;       ///< To-Override
 extern const char * const QD_MA_PHASE;    ///< Phase for override address
 extern const char * const QD_MA_CLASS;    ///< Message-Class
+
+#define QD_MA_PREFIX_LEN  (9)
+#define QD_MA_INGRESS_LEN (16)
+#define QD_MA_TRACE_LEN   (14)
+#define QD_MA_TO_LEN      (11)
+#define QD_MA_PHASE_LEN   (14)
+#define QD_MA_CLASS_LEN   (14)
+
 extern const int          QD_MA_MAX_KEY_LEN;  ///< strlen of longest key name
 extern const int          QD_MA_N_KEYS;       ///< number of router annotation keys
 extern const int          QD_MA_FILTER_LEN;   ///< size of annotation filter buffer
diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h
index 7be09f0..8397742 100644
--- a/include/qpid/dispatch/parse.h
+++ b/include/qpid/dispatch/parse.h
@@ -301,6 +301,17 @@ void qd_parse_annotations(
     qd_iterator_pointer_t *blob_pointer,
     uint32_t              *blob_item_count);
 
+/**
+ * Identify which annotation is being parsed
+ */
+typedef enum {
+    QD_MAE_INGRESS,
+    QD_MAE_TRACE,
+    QD_MAE_TO,
+    QD_MAE_PHASE,
+    QD_MAE_NONE
+} qd_ma_enum_t;
+
 ///@}
 
 #endif
diff --git a/src/iterator.c b/src/iterator.c
index bed686f..13f7f5d 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -20,6 +20,7 @@
 #include <qpid/dispatch/iterator.h>
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/amqp.h>
 #include "message_private.h"
 #include <stdio.h>
 #include <string.h>
@@ -719,9 +720,20 @@ bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, con
     if (!ptr)
         return false;
 
+    // if ptr->remaining holds enough bytes for the comparison then 
+    // don't fiddle with the iterator motions. Just do the comparison directly.
+    if (ptr->remaining >= skip + QD_MA_PREFIX_LEN) {
+        // there's enough in current buffer to do straight compare
+        const void * blk1 = ptr->cursor + skip;
+        const void * blk2 = prefix;
+        return memcmp(blk1, blk2, QD_MA_PREFIX_LEN) == 0;
+    }
+
+    // otherwise compare across buffer boundaries
+    // this, too, could be optimized a bit
     qd_iterator_pointer_t lptr;
     *&lptr = *ptr;
-
+    
     iterator_pointer_move_cursor(&lptr, skip);
 
     unsigned char *c = (unsigned char*) prefix;
diff --git a/src/parse.c b/src/parse.c
index ecfbe0f..0672b0c 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -722,61 +722,123 @@ const char *qd_parse_annotations_v1(
         return parse_error;
     }
 
+    // define a shorthand name for the qd message annotation key prefix length
+#define QMPL QD_MA_PREFIX_LEN
+
+#define MIN(a,b) (((a)<(b))?(a):(b))
+
+    // trace, phase, and class keys are all the same length
+    assert(QD_MA_TRACE_LEN == QD_MA_PHASE_LEN);
+    assert(QD_MA_TRACE_LEN == QD_MA_CLASS_LEN);
+    
     qd_parsed_turbo_t *anno;
     if (!strip_anno_in) {
         anno = DEQ_HEAD(annos);
         while (anno) {
-            qd_iterator_t *key_iter =
-                qd_iterator_buffer(anno->bufptr.buffer,
-                                anno->bufptr.cursor - qd_buffer_base(anno->bufptr.buffer),
-                                anno->size + anno->length_of_size,
-                                ITER_VIEW_ALL);
-            assert(key_iter);
-
-            qd_parsed_field_t *key_field = qd_parse(key_iter);
-            assert(key_field);
-
-            qd_iterator_t *iter = qd_parse_raw(key_field);
-            assert(iter);
-
-            qd_parsed_turbo_t *anno_val = DEQ_NEXT(anno);
-            assert(anno_val);
-
-            qd_iterator_t *val_iter =
-                qd_iterator_buffer(anno_val->bufptr.buffer,
-                                anno_val->bufptr.cursor - qd_buffer_base(anno_val->bufptr.buffer),
-                                anno_val->size + anno_val->length_of_size,
-                                ITER_VIEW_ALL);
-            assert(val_iter);
-
-            qd_parsed_field_t *val_field = qd_parse(val_iter);
-            assert(val_field);
-
-            // Hoist the key name out of the buffers into a normal char array
-            char key_name[QD_MA_MAX_KEY_LEN + 1];
-            (void)qd_iterator_strncpy(iter, key_name, QD_MA_MAX_KEY_LEN + 1);
-
-            // transfer ownership of the extracted value to the message
-            if        (!strcmp(key_name, QD_MA_TRACE)) {
-                *ma_trace = val_field;
-            } else if (!strcmp(key_name, QD_MA_INGRESS)) {
-                *ma_ingress = val_field;
-            } else if (!strcmp(key_name, QD_MA_TO)) {
-                *ma_to_override = val_field;
-            } else if (!strcmp(key_name, QD_MA_PHASE)) {
-                *ma_phase = val_field;
+            uint8_t * dp;                     // pointer to key name in raw buf or extract buf
+            char key_name[QD_MA_MAX_KEY_LEN]; // key name extracted across buf boundary
+
+            if (anno->bufptr.remaining >= anno->size + anno->length_of_size + 1) {
+                // The best case: key name is completely in current raw buffer
+                dp = anno->bufptr.cursor + anno->length_of_size + 1;
             } else {
-                // TODO: this key had the QD_MA_PREFIX but it does not match
-                //       one of the actual fields. 
-                qd_parse_free(val_field);
+                // Pull the key name from multiple buffers
+                qd_iterator_pointer_t wbuf = anno->bufptr;    // scratch buf pointers for getting key
+                uint8_t * wip = wbuf.cursor + anno->length_of_size + 1; // where to look in first buf
+                int t_size = MIN(anno->size, QD_MA_MAX_KEY_LEN); // get this many total
+                int n_local = 0;                              // n copied so far. t_size is goal.
+                while (wbuf.buffer && n_local < t_size) {
+                    // copy current buf bytes in key_name buffer
+                    int n_needed = t_size - n_local;
+                    int n_to_copy = MIN(n_needed, wbuf.remaining);
+                    memmove(key_name + n_local, wip, n_to_copy);
+                    n_local += n_to_copy;
+                    
+                    if (n_local < t_size) {
+                        // move to next buffer
+                        wbuf.buffer = DEQ_NEXT(wbuf.buffer);
+                        if (wbuf.buffer) {
+                            wbuf.remaining = qd_buffer_size(wbuf.buffer);
+                            wip = qd_buffer_base(wbuf.buffer);
+                        }
+                    }
+                }
+                dp = (uint8_t *)key_name;
             }
 
-            qd_iterator_free(key_iter);
-            qd_parse_free(key_field);
-            qd_iterator_free(val_iter);
-            // val_field is usually handed over to message_private and is freed 
+            // Verify that the key starts with the prefix.
+            // Once a key with the routing prefix is observed in the annotation
+            // stream then the remainder of the keys must be routing keys.
+            // Padding keys are not real routing annotations but they have
+            // the routing prefix.
+            assert(memcmp(QD_MA_PREFIX, dp, QMPL) == 0);
+            
+            // Advance pointer to data beyond the common prefix
+            dp += QMPL;
+            
+            qd_ma_enum_t ma_type = QD_MAE_NONE;
+            switch (anno->size) {
+                case QD_MA_TO_LEN:
+                    if (memcmp(QD_MA_TO + QMPL,      dp, QD_MA_TO_LEN - QMPL) == 0) {
+                        ma_type = QD_MAE_TO;
+                    }
+                    break;
+                case QD_MA_TRACE_LEN:
+                    if (memcmp(QD_MA_TRACE + QMPL,  dp, QD_MA_TRACE_LEN - QMPL) == 0) {
+                        ma_type = QD_MAE_TRACE;
+                    } else
+                    if (memcmp(QD_MA_PHASE + QMPL,  dp, QD_MA_PHASE_LEN - QMPL) == 0) {
+                        ma_type = QD_MAE_PHASE;
+                    }
+                    break;
+                case QD_MA_INGRESS_LEN:
+                    if (memcmp(QD_MA_INGRESS + QMPL, dp, QD_MA_INGRESS_LEN - QMPL) == 0) {
+                        ma_type = QD_MAE_INGRESS;
+                    }
+                    break;
+                default:
+                    // padding annotations are ignored here
+                    break;
+            }
 
-            anno = DEQ_NEXT(anno_val);
+            // Process the data field
+            anno = DEQ_NEXT(anno);
+            assert(anno);
+
+            if (ma_type != QD_MAE_NONE) {
+                // produce a parsed_field for the data
+                qd_iterator_t *val_iter =
+                    qd_iterator_buffer(anno->bufptr.buffer,
+                                    anno->bufptr.cursor - qd_buffer_base(anno->bufptr.buffer),
+                                    anno->size + anno->length_of_size,
+                                    ITER_VIEW_ALL);
+                assert(val_iter);
+
+                qd_parsed_field_t *val_field = qd_parse(val_iter);
+                assert(val_field);
+
+                // transfer ownership of the extracted value to the message
+                switch (ma_type) {
+                    case QD_MAE_INGRESS:
+                        *ma_ingress = val_field;
+                        break;
+                    case QD_MAE_TRACE:
+                        *ma_trace = val_field;
+                        break;
+                    case QD_MAE_TO:
+                        *ma_to_override = val_field;
+                        break;
+                    case QD_MAE_PHASE:
+                        *ma_phase = val_field;
+                        break;
+                    case QD_MAE_NONE:
+                        assert(false);
+                        break;
+                }
+
+                qd_iterator_free(val_iter);
+            }
+            anno = DEQ_NEXT(anno);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org