You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/06/26 19:36:32 UTC
svn commit: r1497019 - in /qpid/trunk/qpid/extras/dispatch: CMakeLists.txt
include/qpid/dispatch/compose.h src/compose.c src/compose_private.h
src/message.c src/message_private.h
Author: tross
Date: Wed Jun 26 17:36:32 2013
New Revision: 1497019
URL: http://svn.apache.org/r1497019
Log:
NO-JIRA - Major refactoring of the code for composing performatives in messages.
Added:
qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h (with props)
qpid/trunk/qpid/extras/dispatch/src/compose.c (with props)
qpid/trunk/qpid/extras/dispatch/src/compose_private.h (with props)
Modified:
qpid/trunk/qpid/extras/dispatch/CMakeLists.txt
qpid/trunk/qpid/extras/dispatch/src/message.c
qpid/trunk/qpid/extras/dispatch/src/message_private.h
Modified: qpid/trunk/qpid/extras/dispatch/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/CMakeLists.txt?rev=1497019&r1=1497018&r2=1497019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/CMakeLists.txt (original)
+++ qpid/trunk/qpid/extras/dispatch/CMakeLists.txt Wed Jun 26 17:36:32 2013
@@ -77,6 +77,7 @@ set(server_SOURCES
src/agent.c
src/alloc.c
src/buffer.c
+ src/compose.c
src/config.c
src/container.c
src/dispatch.c
Added: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h?rev=1497019&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h (added)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h Wed Jun 26 17:36:32 2013
@@ -0,0 +1,184 @@
+#ifndef __dispatch_compose_h__
+#define __dispatch_compose_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/buffer.h>
+
+typedef struct dx_composed_field_t dx_composed_field_t;
+
+#define DX_PERFORMATIVE_HEADER 0x70
+#define DX_PERFORMATIVE_DELIVERY_ANNOTATIONS 0x71
+#define DX_PERFORMATIVE_MESSAGE_ANNOTATIONS 0x72
+#define DX_PERFORMATIVE_PROPERTIES 0x73
+#define DX_PERFORMATIVE_APPLICATION_PROPERTIES 0x74
+#define DX_PERFORMATIVE_BODY_DATA 0x75
+#define DX_PERFORMATIVE_BODY_AMQP_SEQUENCE 0x76
+#define DX_PERFORMATIVE_BODY_AMQP_VALUE 0x77
+#define DX_PERFORMATIVE_FOOTER 0x78
+
+/**
+ * Begin composing a new field for a message. The new field can be standalone or
+ * appended onto an existing field.
+ *
+ * @param performative The performative for the message section being composed.
+ * @param extend An existing field onto which to append the new field or NULL to
+ * create a standalone field.
+ * @return A pointer to the newly created field.
+ */
+dx_composed_field_t *dx_compose(uint8_t performative, dx_composed_field_t *extend);
+
+/**
+ * Free the resources associated with a composed field.
+ *
+ * @param A field pointer returned by dx_compose.
+ */
+void dx_compose_free(dx_composed_field_t *field);
+
+/**
+ * Begin to compose the elements of a list in the field. This is called before inserting
+ * the first list element.
+ *
+ * @param field A field created by dx_compose.
+ */
+void dx_compose_start_list(dx_composed_field_t *field);
+
+/**
+ * Complete the composition of a list in the field. This is called after the last
+ * list element has been inserted.
+ *
+ * @param field A field created by dx_compose.
+ */
+void dx_compose_end_list(dx_composed_field_t *field);
+
+/**
+ * Begin to compose the elements os a map in the field. This is called before
+ * inserting the first element-pair into the map.
+ *
+ * @param field A field created by dx_compose.
+ */
+void dx_compose_start_map(dx_composed_field_t *field);
+
+/**
+ * Complete the composition of a map in the field. This is called after the last
+ * element-pair has been inserted.
+ *
+ * @param field A field created by dx_compose.
+ */
+void dx_compose_end_map(dx_composed_field_t *field);
+
+/**
+ * Insert a null element into the field.
+ *
+ * @param field A field created by dx_compose.
+ */
+void dx_compose_insert_null(dx_composed_field_t *field);
+
+/**
+ * Insert a boolean value into the field.
+ *
+ * @param field A field created by dx_compose.
+ * @param value The boolean (zero or non-zero) value to insert.
+ */
+void dx_compose_insert_bool(dx_composed_field_t *field, int value);
+
+/**
+ * Insert an unsigned integer (up to 32 bits) into the field.
+ *
+ * @param field A field created by dx_compose.
+ * @param value The unsigned integer value to be inserted.
+ */
+void dx_compose_insert_uint(dx_composed_field_t *field, uint32_t value);
+
+/**
+ * Insert a long (64-bit) unsigned value into the field.
+ *
+ * @param field A field created by dx_compose.
+ * @param value The unsigned integer value to be inserted.
+ */
+void dx_compose_insert_ulong(dx_composed_field_t *field, uint64_t value);
+
+/**
+ * Insert a signed integer (up to 32 bits) into the field.
+ *
+ * @param field A field created by dx_compose.
+ * @param value The integer value to be inserted.
+ */
+void dx_compose_insert_int(dx_composed_field_t *field, int32_t value);
+
+/**
+ * Insert a long signed integer (64 bits) into the field.
+ *
+ * @param field A field created by dx_compose.
+ * @param value The integer value to be inserted.
+ */
+void dx_compose_insert_long(dx_composed_field_t *field, int64_t value);
+
+/**
+ * Insert a timestamp into the field.
+ *
+ * @param field A field created by dx_compose.
+ * @param value The timestamp value to be inserted.
+ */
+void dx_compose_insert_timestamp(dx_composed_field_t *field, uint64_t value);
+
+/**
+ * Insert a UUID into the field.
+ *
+ * @param field A field created by dx_compose.
+ * @param value The pointer to the first octet in the UUID to be inserted.
+ */
+void dx_compose_insert_uuid(dx_composed_field_t *field, const char *value);
+
+/**
+ * Insert a binary blob into the field.
+ *
+ * @param field A field created by dx_compose.
+ * @param value The pointer to the first octet to be inserted.
+ * @param len The length, in octets, of the binary blob.
+ */
+void dx_compose_insert_binary(dx_composed_field_t *field, const uint8_t *value, uint32_t len);
+
+/**
+ * Insert a binary blob from a list of buffers.
+ *
+ * @param field A field created by dx_compose.
+ * @param buffers A pointer to a list of buffers to be inserted as binary data. Note that
+ * the buffer list will be left empty by this function.
+ */
+void dx_compose_insert_binary_buffers(dx_composed_field_t *field, dx_buffer_list_t *buffers);
+
+/**
+ * Insert a null-terminated utf8-encoded string into the field.
+ *
+ * @param field A field created by dx_compose.
+ * @param value A pointer to a null-terminated string.
+ */
+void dx_compose_insert_string(dx_composed_field_t *field, const char *value);
+
+/**
+ * Insert a symbol into the field.
+ *
+ * @param field A field created by dx_compose.
+ * @param value A pointer to a null-terminated ASCII string.
+ */
+void dx_compose_insert_symbol(dx_composed_field_t *field, const char *value);
+
+#endif
+
Propchange: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/compose.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/trunk/qpid/extras/dispatch/src/compose.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/compose.c?rev=1497019&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/compose.c (added)
+++ qpid/trunk/qpid/extras/dispatch/src/compose.c Wed Jun 26 17:36:32 2013
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/buffer.h>
+#include "message_private.h"
+#include "compose_private.h"
+#include <memory.h>
+
+typedef struct dx_composite_t {
+ DEQ_LINKS(struct dx_composite_t);
+ int isMap;
+ uint32_t count;
+ uint32_t length;
+ dx_field_location_t length_location;
+ dx_field_location_t count_location;
+} dx_composite_t;
+
+ALLOC_DECLARE(dx_composite_t);
+ALLOC_DEFINE(dx_composite_t);
+DEQ_DECLARE(dx_composite_t, dx_field_stack_t);
+
+
+struct dx_composed_field_t {
+ dx_buffer_list_t buffers;
+ dx_field_stack_t fieldStack;
+};
+
+ALLOC_DECLARE(dx_composed_field_t);
+ALLOC_DEFINE(dx_composed_field_t);
+
+
+static void bump_count(dx_composed_field_t *field)
+{
+ dx_composite_t *comp = DEQ_HEAD(field->fieldStack);
+ if (comp)
+ comp->count++;
+}
+
+
+static void dx_insert(dx_composed_field_t *field, const uint8_t *seq, size_t len)
+{
+ dx_buffer_t *buf = DEQ_TAIL(field->buffers);
+ dx_composite_t *comp = DEQ_HEAD(field->fieldStack);
+
+ while (len > 0) {
+ if (buf == 0 || dx_buffer_capacity(buf) == 0) {
+ buf = dx_allocate_buffer();
+ if (buf == 0)
+ return;
+ DEQ_INSERT_TAIL(field->buffers, buf);
+ }
+
+ size_t to_copy = dx_buffer_capacity(buf);
+ if (to_copy > len)
+ to_copy = len;
+ memcpy(dx_buffer_cursor(buf), seq, to_copy);
+ dx_buffer_insert(buf, to_copy);
+ len -= to_copy;
+ seq += to_copy;
+ if (comp)
+ comp->length += to_copy;
+ }
+}
+
+
+static void dx_insert_8(dx_composed_field_t *field, uint8_t value)
+{
+ dx_insert(field, &value, 1);
+}
+
+
+static void dx_insert_32(dx_composed_field_t *field, uint32_t value)
+{
+ uint8_t buf[4];
+ buf[0] = (uint8_t) ((value & 0xFF000000) >> 24);
+ buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16);
+ buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8);
+ buf[3] = (uint8_t) (value & 0x000000FF);
+ dx_insert(field, buf, 4);
+}
+
+
+static void dx_insert_64(dx_composed_field_t *field, uint64_t value)
+{
+ uint8_t buf[8];
+ buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56);
+ buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48);
+ buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40);
+ buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32);
+ buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24);
+ buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16);
+ buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8);
+ buf[7] = (uint8_t) (value & 0x00000000000000FFL);
+ dx_insert(field, buf, 8);
+}
+
+
+static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value)
+{
+ while (*buf) {
+ if (*cursor >= dx_buffer_size(*buf)) {
+ *buf = (*buf)->next;
+ *cursor = 0;
+ } else {
+ dx_buffer_base(*buf)[*cursor] = value;
+ (*cursor)++;
+ return;
+ }
+ }
+}
+
+
+static void dx_overwrite_32(dx_field_location_t *field, uint32_t value)
+{
+ dx_buffer_t *buf = field->buffer;
+ size_t cursor = field->offset;
+
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24));
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24));
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24));
+ dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF));
+}
+
+
+static void dx_compose_start_composite(dx_composed_field_t *field, int isMap)
+{
+ if (isMap)
+ dx_insert_8(field, 0xd1); // map32
+ else
+ dx_insert_8(field, 0xd0); // list32
+
+ //
+ // Push a composite descriptor on the field stack
+ //
+ dx_composite_t *comp = new_dx_composite_t();
+ DEQ_ITEM_INIT(comp);
+ comp->isMap = isMap;
+
+ //
+ // Mark the current location to later overwrite the length
+ //
+ comp->length_location.buffer = DEQ_TAIL(field->buffers);
+ comp->length_location.offset = dx_buffer_size(comp->length_location.buffer);
+ comp->length_location.length = 4;
+ comp->length_location.parsed = 1;
+
+ dx_insert(field, (const uint8_t*) "\x00\x00\x00\x00", 4);
+
+ //
+ // Mark the current location to later overwrite the count
+ //
+ comp->count_location.buffer = DEQ_TAIL(field->buffers);
+ comp->count_location.offset = dx_buffer_size(comp->count_location.buffer);
+ comp->count_location.length = 4;
+ comp->count_location.parsed = 1;
+
+ dx_insert(field, (const uint8_t*) "\x00\x00\x00\x00", 4);
+
+ comp->length = 4; // Include the length of the count field
+ comp->count = 0;
+
+ DEQ_INSERT_HEAD(field->fieldStack, comp);
+}
+
+
+static void dx_compose_end_composite(dx_composed_field_t *field)
+{
+ dx_composite_t *comp = DEQ_HEAD(field->fieldStack);
+ assert(comp);
+
+ dx_overwrite_32(&comp->length_location, comp->length);
+ dx_overwrite_32(&comp->count_location, comp->count);
+
+ DEQ_REMOVE_HEAD(field->fieldStack);
+ free_dx_composite_t(comp);
+}
+
+
+dx_composed_field_t *dx_compose(uint8_t performative, dx_composed_field_t *extend)
+{
+ dx_composed_field_t *field = extend;
+
+ if (field) {
+ assert(DEQ_SIZE(field->fieldStack) == 0);
+ } else {
+ field = new_dx_composed_field_t();
+ if (!field)
+ return 0;
+
+ DEQ_INIT(field->buffers);
+ DEQ_INIT(field->fieldStack);
+ }
+
+ dx_insert(field, (const uint8_t*) "\x00\x53", 2);
+ dx_insert_8(field, performative);
+
+ return field;
+}
+
+
+void dx_compose_free(dx_composed_field_t *field)
+{
+ dx_buffer_t *buf = DEQ_HEAD(field->buffers);
+ while (buf) {
+ DEQ_REMOVE_HEAD(field->buffers);
+ dx_free_buffer(buf);
+ }
+
+ dx_composite_t *comp = DEQ_HEAD(field->fieldStack);
+ while (comp) {
+ DEQ_REMOVE_HEAD(field->fieldStack);
+ free_dx_composite_t(comp);
+ }
+
+ free_dx_composed_field_t(field);
+}
+
+
+void dx_compose_start_list(dx_composed_field_t *field)
+{
+ dx_compose_start_composite(field, 0);
+}
+
+
+void dx_compose_end_list(dx_composed_field_t *field)
+{
+ dx_compose_end_composite(field);
+}
+
+
+void dx_compose_start_map(dx_composed_field_t *field)
+{
+ dx_compose_start_composite(field, 1);
+}
+
+
+void dx_compose_end_map(dx_composed_field_t *field)
+{
+ dx_compose_end_composite(field);
+}
+
+
+void dx_compose_insert_null(dx_composed_field_t *field)
+{
+ dx_insert_8(field, 0x40);
+ bump_count(field);
+}
+
+
+void dx_compose_insert_bool(dx_composed_field_t *field, int value)
+{
+ dx_insert_8(field, value ? 0x41 : 0x42);
+ bump_count(field);
+}
+
+
+void dx_compose_insert_uint(dx_composed_field_t *field, uint32_t value)
+{
+ if (value == 0) {
+ dx_insert_8(field, 0x43); // uint0
+ } else if (value < 256) {
+ dx_insert_8(field, 0x52); // smalluint
+ dx_insert_8(field, (uint8_t) value);
+ } else {
+ dx_insert_8(field, 0x70); // uint
+ dx_insert_32(field, value);
+ }
+ bump_count(field);
+}
+
+
+void dx_compose_insert_ulong(dx_composed_field_t *field, uint64_t value)
+{
+ if (value == 0) {
+ dx_insert_8(field, 0x44); // ulong0
+ } else if (value < 256) {
+ dx_insert_8(field, 0x53); // smallulong
+ dx_insert_8(field, (uint8_t) value);
+ } else {
+ dx_insert_8(field, 0x80); // ulong
+ dx_insert_64(field, value);
+ }
+ bump_count(field);
+}
+
+
+void dx_compose_insert_int(dx_composed_field_t *field, int32_t value)
+{
+ if (value >= -128 && value <= 127) {
+ dx_insert_8(field, 0x54); // smallint
+ dx_insert_8(field, (uint8_t) value);
+ } else {
+ dx_insert_8(field, 0x71); // int
+ dx_insert_32(field, (uint32_t) value);
+ }
+ bump_count(field);
+}
+
+
+void dx_compose_insert_long(dx_composed_field_t *field, int64_t value)
+{
+ if (value >= -128 && value <= 127) {
+ dx_insert_8(field, 0x55); // smalllong
+ dx_insert_8(field, (uint8_t) value);
+ } else {
+ dx_insert_8(field, 0x81); // long
+ dx_insert_64(field, (uint64_t) value);
+ }
+ bump_count(field);
+}
+
+
+void dx_compose_insert_timestamp(dx_composed_field_t *field, uint64_t value)
+{
+ dx_insert_8(field, 0x83); // timestamp
+ dx_insert_64(field, value);
+ bump_count(field);
+}
+
+
+void dx_compose_insert_uuid(dx_composed_field_t *field, const char *value)
+{
+ dx_insert_8(field, 0x98); // uuid
+ dx_insert(field, (const uint8_t*) value, 16);
+ bump_count(field);
+}
+
+
+void dx_compose_insert_binary(dx_composed_field_t *field, const uint8_t *value, uint32_t len)
+{
+ if (len < 256) {
+ dx_insert_8(field, 0xa0); // vbin8
+ dx_insert_8(field, (uint8_t) len);
+ } else {
+ dx_insert_8(field, 0xb0); // vbin32
+ dx_insert_32(field, len);
+ }
+ dx_insert(field, value, len);
+ bump_count(field);
+}
+
+
+void dx_compose_insert_binary_buffers(dx_composed_field_t *field, dx_buffer_list_t *buffers)
+{
+ dx_buffer_t *buf = DEQ_HEAD(*buffers);
+ uint32_t len = 0;
+
+ //
+ // Calculate the size of the binary field to be appended.
+ //
+ while (buf) {
+ len += dx_buffer_size(buf);
+ buf = DEQ_NEXT(buf);
+ }
+
+ //
+ // Supply the appropriate binary tag for the length.
+ //
+ if (len < 256) {
+ dx_insert_8(field, 0xa0); // vbin8
+ dx_insert_8(field, (uint8_t) len);
+ } else {
+ dx_insert_8(field, 0xb0); // vbin32
+ dx_insert_32(field, len);
+ }
+
+ //
+ // Move the supplied buffers to the tail of the field's buffer list.
+ //
+ buf = DEQ_HEAD(*buffers);
+ while (buf) {
+ DEQ_REMOVE_HEAD(*buffers);
+ DEQ_INSERT_TAIL(field->buffers, buf);
+ buf = DEQ_HEAD(*buffers);
+ }
+}
+
+
+void dx_compose_insert_string(dx_composed_field_t *field, const char *value)
+{
+ uint32_t len = strlen(value);
+
+ if (len < 256) {
+ dx_insert_8(field, 0xa1); // str8-utf8
+ dx_insert_8(field, (uint8_t) len);
+ } else {
+ dx_insert_8(field, 0xb1); // str32-utf8
+ dx_insert_32(field, len);
+ }
+ dx_insert(field, (const uint8_t*) value, len);
+ bump_count(field);
+}
+
+
+void dx_compose_insert_symbol(dx_composed_field_t *field, const char *value)
+{
+ uint32_t len = strlen(value);
+
+ if (len < 256) {
+ dx_insert_8(field, 0xa3); // sym8
+ dx_insert_8(field, (uint8_t) len);
+ } else {
+ dx_insert_8(field, 0xb3); // sym32
+ dx_insert_32(field, len);
+ }
+ dx_insert(field, (const uint8_t*) value, len);
+ bump_count(field);
+}
+
+
+dx_buffer_list_t *dx_compose_buffers(dx_composed_field_t *field)
+{
+ return &field->buffers;
+}
+
Propchange: qpid/trunk/qpid/extras/dispatch/src/compose.c
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/extras/dispatch/src/compose_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/compose_private.h?rev=1497019&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/compose_private.h (added)
+++ qpid/trunk/qpid/extras/dispatch/src/compose_private.h Wed Jun 26 17:36:32 2013
@@ -0,0 +1,26 @@
+#ifndef __compose_private_h__
+#define __compose_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/compose.h>
+
+dx_buffer_list_t *dx_compose_buffers(dx_composed_field_t *field);
+
+#endif
Propchange: qpid/trunk/qpid/extras/dispatch/src/compose_private.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/extras/dispatch/src/compose_private.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Modified: qpid/trunk/qpid/extras/dispatch/src/message.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message.c?rev=1497019&r1=1497018&r2=1497019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message.c Wed Jun 26 17:36:32 2013
@@ -20,6 +20,7 @@
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/threading.h>
#include "message_private.h"
+#include "compose_private.h"
#include <string.h>
#include <stdio.h>
@@ -255,134 +256,6 @@ static int dx_check_and_advance(dx_buffe
}
-static void dx_insert(dx_message_content_t *msg, const uint8_t *seq, size_t len)
-{
- dx_buffer_t *buf = DEQ_TAIL(msg->buffers);
-
- while (len > 0) {
- if (buf == 0 || dx_buffer_capacity(buf) == 0) {
- buf = dx_allocate_buffer();
- if (buf == 0)
- return;
- DEQ_INSERT_TAIL(msg->buffers, buf);
- }
-
- size_t to_copy = dx_buffer_capacity(buf);
- if (to_copy > len)
- to_copy = len;
- memcpy(dx_buffer_cursor(buf), seq, to_copy);
- dx_buffer_insert(buf, to_copy);
- len -= to_copy;
- seq += to_copy;
- msg->length += to_copy;
- }
-}
-
-
-static void dx_insert_8(dx_message_content_t *msg, uint8_t value)
-{
- dx_insert(msg, &value, 1);
-}
-
-
-static void dx_insert_32(dx_message_content_t *msg, uint32_t value)
-{
- uint8_t buf[4];
- buf[0] = (uint8_t) ((value & 0xFF000000) >> 24);
- buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16);
- buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8);
- buf[3] = (uint8_t) (value & 0x000000FF);
- dx_insert(msg, buf, 4);
-}
-
-
-static void dx_insert_64(dx_message_content_t *msg, uint64_t value)
-{
- uint8_t buf[8];
- buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56);
- buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48);
- buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40);
- buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32);
- buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24);
- buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16);
- buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8);
- buf[7] = (uint8_t) (value & 0x00000000000000FFL);
- dx_insert(msg, buf, 8);
-}
-
-
-static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value)
-{
- while (*buf) {
- if (*cursor >= dx_buffer_size(*buf)) {
- *buf = (*buf)->next;
- *cursor = 0;
- } else {
- dx_buffer_base(*buf)[*cursor] = value;
- (*cursor)++;
- return;
- }
- }
-}
-
-
-static void dx_overwrite_32(dx_field_location_t *field, uint32_t value)
-{
- dx_buffer_t *buf = field->buffer;
- size_t cursor = field->offset;
-
- dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24));
- dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24));
- dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24));
- dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF));
-}
-
-
-static void dx_start_list_performative(dx_message_content_t *msg, uint8_t code)
-{
- //
- // Insert the short-form performative tag
- //
- dx_insert(msg, (const uint8_t*) "\x00\x53", 2);
- dx_insert_8(msg, code);
-
- //
- // Open the list with a list32 tag
- //
- dx_insert_8(msg, 0xd0);
-
- //
- // Mark the current location to later overwrite the length
- //
- msg->compose_length.buffer = DEQ_TAIL(msg->buffers);
- msg->compose_length.offset = dx_buffer_size(msg->compose_length.buffer);
- msg->compose_length.length = 4;
- msg->compose_length.parsed = 1;
-
- dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4);
-
- //
- // Mark the current location to later overwrite the count
- //
- msg->compose_count.buffer = DEQ_TAIL(msg->buffers);
- msg->compose_count.offset = dx_buffer_size(msg->compose_count.buffer);
- msg->compose_count.length = 4;
- msg->compose_count.parsed = 1;
-
- dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4);
-
- msg->length = 4; // Include the length of the count field
- msg->count = 0;
-}
-
-
-static void dx_end_list(dx_message_content_t *msg)
-{
- dx_overwrite_32(&msg->compose_length, msg->length);
- dx_overwrite_32(&msg->compose_count, msg->count);
-}
-
-
static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_message_field_t field)
{
dx_message_content_t *content = MSG_CONTENT(msg);
@@ -825,302 +698,43 @@ ssize_t dx_message_field_copy(dx_message
void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers)
{
- dx_message_begin_header(msg);
- dx_message_insert_boolean(msg, 0); // durable
- //dx_message_insert_null(msg); // priority
- //dx_message_insert_null(msg); // ttl
- //dx_message_insert_boolean(msg, 0); // first-acquirer
- //dx_message_insert_uint(msg, 0); // delivery-count
- dx_message_end_header(msg);
-
- dx_message_begin_message_properties(msg);
- dx_message_insert_null(msg); // message-id
- dx_message_insert_null(msg); // user-id
- dx_message_insert_string(msg, to); // to
- //dx_message_insert_null(msg); // subject
- //dx_message_insert_null(msg); // reply-to
- //dx_message_insert_null(msg); // correlation-id
- //dx_message_insert_null(msg); // content-type
- //dx_message_insert_null(msg); // content-encoding
- //dx_message_insert_timestamp(msg, 0); // absolute-expiry-time
- //dx_message_insert_timestamp(msg, 0); // creation-time
- //dx_message_insert_null(msg); // group-id
- //dx_message_insert_uint(msg, 0); // group-sequence
- //dx_message_insert_null(msg); // reply-to-group-id
- dx_message_end_message_properties(msg);
-
- if (buffers)
- dx_message_append_body_data(msg, buffers);
-}
-
-
-void dx_message_begin_header(dx_message_t *msg)
-{
- dx_start_list_performative(MSG_CONTENT(msg), 0x70);
-}
-
-
-void dx_message_end_header(dx_message_t *msg)
-{
- dx_end_list(MSG_CONTENT(msg));
-}
-
-
-void dx_message_begin_delivery_annotations(dx_message_t *msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_end_delivery_annotations(dx_message_t *msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_begin_message_annotations(dx_message_t *msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_end_message_annotations(dx_message_t *msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_begin_message_properties(dx_message_t *msg)
-{
- dx_start_list_performative(MSG_CONTENT(msg), 0x73);
-}
-
-
-void dx_message_end_message_properties(dx_message_t *msg)
-{
- dx_end_list(MSG_CONTENT(msg));
-}
-
-
-void dx_message_begin_application_properties(dx_message_t *msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_end_application_properties(dx_message_t *msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers)
-{
+ dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0);
dx_message_content_t *content = MSG_CONTENT(msg);
- dx_buffer_t *buf = DEQ_HEAD(*buffers);
- uint32_t len = 0;
- //
- // Calculate the size of the body to be appended.
- //
- while (buf) {
- len += dx_buffer_size(buf);
- buf = DEQ_NEXT(buf);
- }
-
- //
- // Insert a DATA section performative header.
- //
- dx_insert(content, (const uint8_t*) "\x00\x53\x75", 3);
- if (len < 256) {
- dx_insert_8(content, 0xa0); // vbin8
- dx_insert_8(content, (uint8_t) len);
- } else {
- dx_insert_8(content, 0xb0); // vbin32
- dx_insert_32(content, len);
- }
-
- //
- // Move the supplied buffers to the tail of the message's buffer list.
- //
- buf = DEQ_HEAD(*buffers);
- while (buf) {
- DEQ_REMOVE_HEAD(*buffers);
- DEQ_INSERT_TAIL(content->buffers, buf);
- buf = DEQ_HEAD(*buffers);
- }
-}
-
-
-void dx_message_begin_body_sequence(dx_message_t *msg)
-{
-}
-
-
-void dx_message_end_body_sequence(dx_message_t *msg)
-{
-}
-
-
-void dx_message_begin_footer(dx_message_t *msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_end_footer(dx_message_t *msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_insert_null(dx_message_t *msg)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- dx_insert_8(content, 0x40);
- content->count++;
-}
-
-
-void dx_message_insert_boolean(dx_message_t *msg, int value)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- if (value)
- dx_insert(content, (const uint8_t*) "\x56\x01", 2);
- else
- dx_insert(content, (const uint8_t*) "\x56\x00", 2);
- content->count++;
-}
-
-
-void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- dx_insert_8(content, 0x50);
- dx_insert_8(content, value);
- content->count++;
-}
-
-
-void dx_message_insert_uint(dx_message_t *msg, uint32_t value)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- if (value == 0) {
- dx_insert_8(content, 0x43); // uint0
- } else if (value < 256) {
- dx_insert_8(content, 0x52); // smalluint
- dx_insert_8(content, (uint8_t) value);
- } else {
- dx_insert_8(content, 0x70); // uint
- dx_insert_32(content, value);
- }
- content->count++;
-}
-
-
-void dx_message_insert_ulong(dx_message_t *msg, uint64_t value)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- if (value == 0) {
- dx_insert_8(content, 0x44); // ulong0
- } else if (value < 256) {
- dx_insert_8(content, 0x53); // smallulong
- dx_insert_8(content, (uint8_t) value);
- } else {
- dx_insert_8(content, 0x80); // ulong
- dx_insert_64(content, value);
- }
- content->count++;
-}
-
-
-void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- if (len < 256) {
- dx_insert_8(content, 0xa0); // vbin8
- dx_insert_8(content, (uint8_t) len);
- } else {
- dx_insert_8(content, 0xb0); // vbin32
- dx_insert_32(content, len);
- }
- dx_insert(content, start, len);
- content->count++;
-}
-
-
-void dx_message_insert_string(dx_message_t *msg, const char *str)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- uint32_t len = strlen(str);
-
- if (len < 256) {
- dx_insert_8(content, 0xa1); // str8-utf8
- dx_insert_8(content, (uint8_t) len);
- dx_insert(content, (const uint8_t*) str, len);
- } else {
- dx_insert_8(content, 0xb1); // str32-utf8
- dx_insert_32(content, len);
- dx_insert(content, (const uint8_t*) str, len);
- }
- content->count++;
-}
-
-
-void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- dx_insert_8(content, 0x98); // uuid
- dx_insert(content, value, 16);
- content->count++;
-}
+ dx_compose_start_list(field);
+ dx_compose_insert_bool(field, 0); // durable
+ //dx_compose_insert_null(field); // priority
+ //dx_compose_insert_null(field); // ttl
+ //dx_compose_insert_boolean(field, 0); // first-acquirer
+ //dx_compose_insert_uint(field, 0); // delivery-count
+ dx_compose_end_list(field);
+
+ field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
+ dx_compose_start_list(field);
+ dx_compose_insert_null(field); // compose-id
+ dx_compose_insert_null(field); // user-id
+ dx_compose_insert_string(field, to); // to
+ //dx_compose_insert_null(field); // subject
+ //dx_compose_insert_null(field); // reply-to
+ //dx_compose_insert_null(field); // correlation-id
+ //dx_compose_insert_null(field); // content-type
+ //dx_compose_insert_null(field); // content-encoding
+ //dx_compose_insert_timestamp(field, 0); // absolute-expiry-time
+ //dx_compose_insert_timestamp(field, 0); // creation-time
+ //dx_compose_insert_null(field); // group-id
+ //dx_compose_insert_uint(field, 0); // group-sequence
+ //dx_compose_insert_null(field); // reply-to-group-id
+ dx_compose_end_list(field);
+
+ if (buffers) {
+ field = dx_compose(DX_PERFORMATIVE_BODY_DATA, field);
+ dx_compose_insert_binary_buffers(field, buffers);
+ }
+
+ dx_buffer_list_t *field_buffers = dx_compose_buffers(field);
+ content->buffers = *field_buffers;
+ DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
-
-void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- if (len < 256) {
- dx_insert_8(content, 0xa3); // sym8
- dx_insert_8(content, (uint8_t) len);
- dx_insert(content, (const uint8_t*) start, len);
- } else {
- dx_insert_8(content, 0xb3); // sym32
- dx_insert_32(content, len);
- dx_insert(content, (const uint8_t*) start, len);
- }
- content->count++;
-}
-
-
-void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- dx_insert_8(content, 0x83); // timestamp
- dx_insert_64(content, value);
- content->count++;
-}
-
-
-void dx_message_begin_list(dx_message_t* msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_end_list(dx_message_t* msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_begin_map(dx_message_t* msg)
-{
- assert(0); // Not Implemented
-}
-
-
-void dx_message_end_map(dx_message_t* msg)
-{
- assert(0); // Not Implemented
+ dx_compose_free(field);
}
Modified: qpid/trunk/qpid/extras/dispatch/src/message_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message_private.h?rev=1497019&r1=1497018&r2=1497019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message_private.h Wed Jun 26 17:36:32 2013
@@ -74,10 +74,6 @@ typedef struct {
dx_field_location_t field_user_id; // The string value of the user-id
dx_field_location_t field_to; // The string value of the to field
dx_field_location_t body; // The body of the message
- dx_field_location_t compose_length;
- dx_field_location_t compose_count;
- uint32_t length;
- uint32_t count;
dx_buffer_t *parse_buffer;
unsigned char *parse_cursor;
dx_message_depth_t parse_depth;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org