You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/03/08 19:33:47 UTC
svn commit: r1298498 [2/3] - in /qpid/proton/proton-c: ./ include/
include/proton/ mllib/ src/ src/codec/ src/engine/ src/framing/ src/types/
Added: qpid/proton/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/engine/engine.c?rev=1298498&view=auto
==============================================================================
--- qpid/proton/proton-c/src/engine/engine.c (added)
+++ qpid/proton/proton-c/src/engine/engine.c Thu Mar 8 18:33:46 2012
@@ -0,0 +1,1637 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "engine-internal.h"
+#include <stdlib.h>
+#include <string.h>
+#include <proton/framing.h>
+#include <proton/value.h>
+#include "../protocol.h"
+#include <wchar.h>
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <ctype.h>
+
+// delivery buffers
+
+void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, size_t capacity)
+{
+ // XXX: error handling
+ db->deliveries = malloc(sizeof(pn_delivery_state_t) * capacity);
+ db->next = next;
+ db->capacity = capacity;
+ db->head = 0;
+ db->size = 0;
+}
+
+void pn_delivery_buffer_destroy(pn_delivery_buffer_t *db)
+{
+ free(db->deliveries);
+}
+
+size_t pn_delivery_buffer_size(pn_delivery_buffer_t *db)
+{
+ return db->size;
+}
+
+size_t pn_delivery_buffer_available(pn_delivery_buffer_t *db)
+{
+ return db->capacity - db->size;
+}
+
+bool pn_delivery_buffer_empty(pn_delivery_buffer_t *db)
+{
+ return db->size == 0;
+}
+
+pn_delivery_state_t *pn_delivery_buffer_get(pn_delivery_buffer_t *db, size_t index)
+{
+ if (index < db->size) return db->deliveries + ((db->head + index) % db->capacity);
+ else return NULL;
+}
+
+pn_delivery_state_t *pn_delivery_buffer_head(pn_delivery_buffer_t *db)
+{
+ if (db->size) return db->deliveries + db->head;
+ else return NULL;
+}
+
+pn_delivery_state_t *pn_delivery_buffer_tail(pn_delivery_buffer_t *db)
+{
+ if (db->size) return pn_delivery_buffer_get(db, db->size - 1);
+ else return NULL;
+}
+
+pn_sequence_t pn_delivery_buffer_lwm(pn_delivery_buffer_t *db)
+{
+ if (db->size) return pn_delivery_buffer_head(db)->id;
+ else return db->next;
+}
+
+static void pn_delivery_state_init(pn_delivery_state_t *ds, pn_delivery_t *delivery, pn_sequence_t id)
+{
+ ds->delivery = delivery;
+ ds->id = id;
+ ds->sent = false;
+}
+
+pn_delivery_state_t *pn_delivery_buffer_push(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
+{
+ if (!pn_delivery_buffer_available(db))
+ return NULL;
+ db->size++;
+ pn_delivery_state_t *ds = pn_delivery_buffer_tail(db);
+ pn_delivery_state_init(ds, delivery, db->next++);
+ return ds;
+}
+
+bool pn_delivery_buffer_pop(pn_delivery_buffer_t *db)
+{
+ if (db->size) {
+ db->head = (db->head + 1) % db->capacity;
+ db->size--;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void pn_delivery_buffer_gc(pn_delivery_buffer_t *db)
+{
+ while (db->size && !pn_delivery_buffer_head(db)->delivery) {
+ pn_delivery_buffer_pop(db);
+ }
+}
+
+// endpoints
+
+pn_endpoint_type_t pn_endpoint_type(pn_endpoint_t *endpoint)
+{
+ return endpoint->type;
+}
+
+pn_endpoint_state_t pn_local_state(pn_endpoint_t *endpoint)
+{
+ return endpoint->local_state;
+}
+
+pn_endpoint_state_t pn_remote_state(pn_endpoint_t *endpoint)
+{
+ return endpoint->remote_state;
+}
+
+pn_error_t *pn_local_error(pn_endpoint_t *endpoint)
+{
+ if (endpoint->local_error.condition)
+ return &endpoint->local_error;
+ else
+ return NULL;
+}
+
+pn_error_t *pn_remote_error(pn_endpoint_t *endpoint)
+{
+ if (endpoint->remote_error.condition)
+ return &endpoint->remote_error;
+ else
+ return NULL;
+}
+
+void pn_destroy(pn_endpoint_t *endpoint)
+{
+ switch (endpoint->type)
+ {
+ case CONNECTION:
+ pn_destroy_connection((pn_connection_t *)endpoint);
+ break;
+ case TRANSPORT:
+ pn_destroy_transport((pn_transport_t *)endpoint);
+ break;
+ case SESSION:
+ pn_destroy_session((pn_session_t *)endpoint);
+ break;
+ case SENDER:
+ pn_destroy_sender((pn_sender_t *)endpoint);
+ break;
+ case RECEIVER:
+ pn_destroy_receiver((pn_receiver_t *)endpoint);
+ break;
+ }
+}
+
+void pn_destroy_connection(pn_connection_t *connection)
+{
+ pn_destroy_transport(connection->transport);
+ while (connection->session_count)
+ pn_destroy_session(connection->sessions[connection->session_count - 1]);
+ free(connection->sessions);
+ free(connection);
+}
+
+void pn_destroy_transport(pn_transport_t *transport)
+{
+ pn_free_map(transport->dispatch);
+ pn_free_list(transport->args);
+ for (int i = 0; i < transport->session_capacity; i++) {
+ pn_delivery_buffer_destroy(&transport->sessions[i].incoming);
+ pn_delivery_buffer_destroy(&transport->sessions[i].outgoing);
+ free(transport->sessions[i].links);
+ free(transport->sessions[i].handles);
+ }
+ free(transport->sessions);
+ free(transport->channels);
+ free(transport->output);
+ free(transport);
+}
+
+void pn_add_session(pn_connection_t *conn, pn_session_t *ssn)
+{
+ PN_ENSURE(conn->sessions, conn->session_capacity, conn->session_count + 1);
+ conn->sessions[conn->session_count++] = ssn;
+ ssn->connection = conn;
+ ssn->id = conn->session_count;
+}
+
+void pn_remove_session(pn_connection_t *conn, pn_session_t *ssn)
+{
+ for (int i = 0; i < conn->session_count; i++)
+ {
+ if (conn->sessions[i] == ssn)
+ {
+ memmove(&conn->sessions[i], &conn->sessions[i+1], conn->session_count - i - 1);
+ conn->session_count--;
+ break;
+ }
+ }
+ ssn->connection = NULL;
+}
+
+void pn_destroy_session(pn_session_t *session)
+{
+ while (session->link_count)
+ pn_destroy(&session->links[session->link_count - 1]->endpoint);
+ pn_remove_session(session->connection, session);
+ free(session->links);
+ free(session);
+}
+
+void pn_add_link(pn_session_t *ssn, pn_link_t *link)
+{
+ PN_ENSURE(ssn->links, ssn->link_capacity, ssn->link_count + 1);
+ ssn->links[ssn->link_count++] = link;
+ link->session = ssn;
+ link->id = ssn->link_count;
+}
+
+void pn_remove_link(pn_session_t *ssn, pn_link_t *link)
+{
+ for (int i = 0; i < ssn->link_count; i++)
+ {
+ if (ssn->links[i] == link)
+ {
+ memmove(&ssn->links[i], &ssn->links[i+1], ssn->link_count - i - 1);
+ ssn->link_count--;
+ break;
+ }
+ }
+ link->session = NULL;
+}
+
+void pn_clear_tag(pn_delivery_t *delivery)
+{
+ if (delivery->tag) {
+ pn_free_binary(delivery->tag);
+ delivery->tag = NULL;
+ }
+}
+
+void pn_free_deliveries(pn_delivery_t *delivery)
+{
+ while (delivery)
+ {
+ pn_delivery_t *next = delivery->link_next;
+ pn_clear_tag(delivery);
+ if (delivery->capacity) free(delivery->bytes);
+ free(delivery);
+ delivery = next;
+ }
+}
+
+void pn_dump_deliveries(pn_delivery_t *delivery)
+{
+ if (delivery) {
+ while (delivery)
+ {
+ printf("%p(%.*s)", (void *) delivery, (int) pn_binary_size(delivery->tag),
+ pn_binary_bytes(delivery->tag));
+ if (delivery->link_next) printf(" -> ");
+ delivery = delivery->link_next;
+ }
+ } else {
+ printf("NULL");
+ }
+}
+
+void pn_link_dump(pn_link_t *link)
+{
+ pn_dump_deliveries(link->settled_head);
+ printf("\n");
+ pn_dump_deliveries(link->head);
+ printf("\n");
+}
+
+void pn_link_uninit(pn_link_t *link)
+{
+ if (link->remote_source) free(link->remote_source);
+ if (link->remote_target) free(link->remote_target);
+ pn_remove_link(link->session, link);
+ pn_free_deliveries(link->settled_head);
+ pn_free_deliveries(link->head);
+ free(link->name);
+}
+
+void pn_destroy_sender(pn_sender_t *sender)
+{
+ pn_link_uninit(&sender->link);
+ free(sender);
+}
+void pn_destroy_receiver(pn_receiver_t *receiver)
+{
+ pn_link_uninit(&receiver->link);
+ free(receiver);
+}
+
+void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
+{
+ endpoint->type = type;
+ endpoint->local_state = UNINIT;
+ endpoint->remote_state = UNINIT;
+ endpoint->local_error = (pn_error_t) {.condition = NULL};
+ endpoint->remote_error = (pn_error_t) {.condition = NULL};
+ endpoint->endpoint_next = NULL;
+ endpoint->endpoint_prev = NULL;
+ endpoint->transport_next = NULL;
+ endpoint->transport_prev = NULL;
+ endpoint->modified = false;
+
+ LL_ADD_PFX(conn->endpoint_head, conn->endpoint_tail, endpoint, endpoint_);
+}
+
+pn_connection_t *pn_get_connection(pn_endpoint_t *endpoint)
+{
+ switch (endpoint->type) {
+ case CONNECTION:
+ return (pn_connection_t *) endpoint;
+ case SESSION:
+ return ((pn_session_t *) endpoint)->connection;
+ case SENDER:
+ case RECEIVER:
+ return ((pn_link_t *) endpoint)->session->connection;
+ case TRANSPORT:
+ return ((pn_transport_t *) endpoint)->connection;
+ }
+
+ return NULL;
+}
+
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
+
+void pn_open(pn_endpoint_t *endpoint)
+{
+ // TODO: do we care about the current state?
+ endpoint->local_state = ACTIVE;
+ pn_modified(pn_get_connection(endpoint), endpoint);
+}
+
+void pn_close(pn_endpoint_t *endpoint)
+{
+ // TODO: do we care about the current state?
+ endpoint->local_state = CLOSED;
+ pn_modified(pn_get_connection(endpoint), endpoint);
+}
+
+pn_connection_t *pn_connection()
+{
+ pn_connection_t *conn = malloc(sizeof(pn_connection_t));
+ conn->endpoint_head = NULL;
+ conn->endpoint_tail = NULL;
+ pn_endpoint_init(&conn->endpoint, CONNECTION, conn);
+ conn->transport_head = NULL;
+ conn->transport_tail = NULL;
+ conn->sessions = NULL;
+ conn->session_capacity = 0;
+ conn->session_count = 0;
+ conn->transport = NULL;
+ conn->work_head = NULL;
+ conn->work_tail = NULL;
+ conn->tpwork_head = NULL;
+ conn->tpwork_tail = NULL;
+
+ return conn;
+}
+
+pn_delivery_t *pn_work_head(pn_connection_t *connection)
+{
+ return connection->work_head;
+}
+
+pn_delivery_t *pn_work_next(pn_delivery_t *delivery)
+{
+ if (delivery->work)
+ return delivery->work_next;
+ else
+ return pn_work_head(delivery->link->session->connection);
+}
+
+void pn_add_work(pn_connection_t *connection, pn_delivery_t *delivery)
+{
+ if (!delivery->work)
+ {
+ LL_ADD_PFX(connection->work_head, connection->work_tail, delivery, work_);
+ delivery->work = true;
+ }
+}
+
+void pn_clear_work(pn_connection_t *connection, pn_delivery_t *delivery)
+{
+ if (delivery->work)
+ {
+ LL_REMOVE_PFX(connection->work_head, connection->work_tail, delivery, work_);
+ delivery->work = false;
+ }
+}
+
+void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery)
+{
+ pn_link_t *link = pn_link(delivery);
+ pn_delivery_t *current = pn_current(link);
+ if (delivery->dirty) {
+ pn_add_work(connection, delivery);
+ } else if (delivery == current) {
+ if (link->endpoint.type == SENDER) {
+ if (link->credit > 0) {
+ pn_add_work(connection, delivery);
+ } else {
+ pn_clear_work(connection, delivery);
+ }
+ } else {
+ pn_add_work(connection, delivery);
+ }
+ } else {
+ pn_clear_work(connection, delivery);
+ }
+}
+
+void pn_add_tpwork(pn_delivery_t *delivery)
+{
+ pn_connection_t *connection = delivery->link->session->connection;
+ if (!delivery->tpwork)
+ {
+ LL_ADD_PFX(connection->tpwork_head, connection->tpwork_tail, delivery, tpwork_);
+ delivery->tpwork = true;
+ }
+ pn_modified(connection, &connection->endpoint);
+}
+
+void pn_clear_tpwork(pn_delivery_t *delivery)
+{
+ pn_connection_t *connection = delivery->link->session->connection;
+ if (delivery->tpwork)
+ {
+ LL_REMOVE_PFX(connection->tpwork_head, connection->tpwork_tail, delivery, tpwork_);
+ delivery->tpwork = false;
+ }
+}
+
+void pn_dump(pn_connection_t *conn)
+{
+ pn_endpoint_t *endpoint = conn->transport_head;
+ while (endpoint)
+ {
+ printf("%p", (void *) endpoint);
+ endpoint = endpoint->transport_next;
+ if (endpoint)
+ printf(" -> ");
+ }
+ printf("\n");
+}
+
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
+{
+ if (!endpoint->modified) {
+ LL_ADD_PFX(connection->transport_head, connection->transport_tail, endpoint, transport_);
+ endpoint->modified = true;
+ }
+}
+
+void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
+{
+ if (endpoint->modified) {
+ LL_REMOVE_PFX(connection->transport_head, connection->transport_tail, endpoint, transport_);
+ endpoint->transport_next = NULL;
+ endpoint->transport_prev = NULL;
+ endpoint->modified = false;
+ }
+}
+
+bool pn_matches(pn_endpoint_t *endpoint, pn_endpoint_state_t local,
+ pn_endpoint_state_t remote)
+{
+ return (endpoint->local_state & local) && (endpoint->remote_state & remote);
+}
+
+pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_state_t local,
+ pn_endpoint_state_t remote)
+{
+ while (endpoint)
+ {
+ if (pn_matches(endpoint, local, remote))
+ return endpoint;
+ endpoint = endpoint->endpoint_next;
+ }
+ return NULL;
+}
+
+pn_endpoint_t *pn_endpoint_head(pn_connection_t *conn,
+ pn_endpoint_state_t local,
+ pn_endpoint_state_t remote)
+{
+ return pn_find(conn->endpoint_head, local, remote);
+}
+
+pn_endpoint_t *pn_endpoint_next(pn_endpoint_t *endpoint,
+ pn_endpoint_state_t local,
+ pn_endpoint_state_t remote)
+{
+ return pn_find(endpoint->endpoint_next, local, remote);
+}
+
+pn_session_t *pn_session(pn_connection_t *conn)
+{
+
+ pn_session_t *ssn = malloc(sizeof(pn_session_t));
+ pn_endpoint_init(&ssn->endpoint, SESSION, conn);
+ pn_add_session(conn, ssn);
+ ssn->links = NULL;
+ ssn->link_capacity = 0;
+ ssn->link_count = 0;
+
+ return ssn;
+}
+
+ /* pn_map_set(MAP, pn_symbol(PN_HEAP, NAME ## _SYM), pn_ulong(PN_HEAP, NAME)); \ */
+#define __DISPATCH(MAP, NAME) \
+ pn_map_set(MAP, pn_ulong(NAME ## _CODE), pn_ulong(NAME ## _IDX))
+
+void pn_transport_init(pn_transport_t *transport)
+{
+ pn_endpoint_init(&transport->endpoint, TRANSPORT, transport->connection);
+
+ pn_map_t *m = pn_map(32);
+ transport->dispatch = m;
+
+ __DISPATCH(m, OPEN);
+ __DISPATCH(m, BEGIN);
+ __DISPATCH(m, ATTACH);
+ __DISPATCH(m, TRANSFER);
+ __DISPATCH(m, FLOW);
+ __DISPATCH(m, DISPOSITION);
+ __DISPATCH(m, DETACH);
+ __DISPATCH(m, END);
+ __DISPATCH(m, CLOSE);
+
+ transport->args = pn_list(16);
+ // XXX
+ transport->capacity = 4*1024;
+ transport->output = malloc(transport->capacity);
+ transport->available = 0;
+
+ transport->open_sent = false;
+ transport->close_sent = false;
+
+ transport->sessions = NULL;
+ transport->session_capacity = 0;
+
+ transport->channels = NULL;
+ transport->channel_capacity = 0;
+}
+
+pn_session_state_t *pn_session_state(pn_transport_t *transport, pn_session_t *ssn)
+{
+ int old_capacity = transport->session_capacity;
+ PN_ENSURE(transport->sessions, transport->session_capacity, ssn->id + 1);
+ for (int i = old_capacity; i < transport->session_capacity; i++)
+ {
+ transport->sessions[i] = (pn_session_state_t) {.session=NULL,
+ .local_channel=-1,
+ .remote_channel=-1};
+ pn_delivery_buffer_init(&transport->sessions[i].incoming, 0, 1024);
+ pn_delivery_buffer_init(&transport->sessions[i].outgoing, 0, 1024);
+ }
+ pn_session_state_t *state = &transport->sessions[ssn->id];
+ state->session = ssn;
+ return state;
+}
+
+pn_session_state_t *pn_channel_state(pn_transport_t *transport, uint16_t channel)
+{
+ PN_ENSUREZ(transport->channels, transport->channel_capacity, channel + 1);
+ return transport->channels[channel];
+}
+
+void pn_map_channel(pn_transport_t *transport, uint16_t channel, pn_session_state_t *state)
+{
+ PN_ENSUREZ(transport->channels, transport->channel_capacity, channel + 1);
+ state->remote_channel = channel;
+ transport->channels[channel] = state;
+}
+
+pn_transport_t *pn_transport(pn_connection_t *conn)
+{
+ if (conn->transport) {
+ return NULL;
+ } else {
+ conn->transport = malloc(sizeof(pn_transport_t));
+ conn->transport->connection = conn;
+ pn_transport_init(conn->transport);
+ return conn->transport;
+ }
+}
+
+wchar_t *wcsdup(const wchar_t *src)
+{
+ if (src) {
+ wchar_t *dest = malloc((wcslen(src)+1)*sizeof(wchar_t));
+ return wcscpy(dest, src);
+ } else {
+ return 0;
+ }
+}
+
+void pn_link_init(pn_link_t *link, int type, pn_session_t *session, const wchar_t *name)
+{
+ pn_endpoint_init(&link->endpoint, type, session->connection);
+ pn_add_link(session, link);
+ link->name = wcsdup(name);
+ link->local_source = NULL;
+ link->local_target = NULL;
+ link->remote_source = NULL;
+ link->remote_target = NULL;
+ link->settled_head = link->settled_tail = NULL;
+ link->head = link->tail = link->current = NULL;
+ link->credit = 0;
+}
+
+void pn_set_source(pn_link_t *link, const wchar_t *source)
+{
+ link->local_source = source;
+}
+
+void pn_set_target(pn_link_t *link, const wchar_t *target)
+{
+ link->local_target = target;
+}
+
+wchar_t *pn_remote_source(pn_link_t *link)
+{
+ return link->remote_source;
+}
+
+wchar_t *pn_remote_target(pn_link_t *link)
+{
+ return link->remote_target;
+}
+
+pn_link_state_t *pn_link_state(pn_session_state_t *ssn_state, pn_link_t *link)
+{
+ int old_capacity = ssn_state->link_capacity;
+ PN_ENSURE(ssn_state->links, ssn_state->link_capacity, link->id + 1);
+ for (int i = old_capacity; i < ssn_state->link_capacity; i++)
+ {
+ ssn_state->links[i] = (pn_link_state_t) {.link=NULL, .local_handle = -1,
+ .remote_handle=-1};
+ }
+ pn_link_state_t *state = &ssn_state->links[link->id];
+ state->link = link;
+ return state;
+}
+
+void pn_map_handle(pn_session_state_t *ssn_state, uint32_t handle, pn_link_state_t *state)
+{
+ PN_ENSUREZ(ssn_state->handles, ssn_state->handle_capacity, handle + 1);
+ state->remote_handle = handle;
+ ssn_state->handles[handle] = state;
+}
+
+pn_link_state_t *pn_handle_state(pn_session_state_t *ssn_state, uint32_t handle)
+{
+ PN_ENSUREZ(ssn_state->handles, ssn_state->handle_capacity, handle + 1);
+ return ssn_state->handles[handle];
+}
+
+pn_sender_t *pn_sender(pn_session_t *session, const wchar_t *name)
+{
+ pn_sender_t *snd = malloc(sizeof(pn_sender_t));
+ pn_link_init(&snd->link, SENDER, session, name);
+ return snd;
+}
+
+pn_receiver_t *pn_receiver(pn_session_t *session, const wchar_t *name)
+{
+ pn_receiver_t *rcv = malloc(sizeof(pn_receiver_t));
+ pn_link_init(&rcv->link, RECEIVER, session, name);
+ rcv->credits = 0;
+ return rcv;
+}
+
+pn_session_t *pn_get_session(pn_link_t *link)
+{
+ return link->session;
+}
+
+pn_delivery_t *pn_delivery(pn_link_t *link, pn_binary_t *tag)
+{
+ pn_delivery_t *delivery = link->settled_head;
+ LL_POP_PFX(link->settled_head, link->settled_tail, link_);
+ if (!delivery) delivery = malloc(sizeof(pn_delivery_t));
+ delivery->link = link;
+ delivery->tag = pn_binary_dup(tag);
+ delivery->local_state = 0;
+ delivery->remote_state = 0;
+ delivery->local_settled = false;
+ delivery->remote_settled = false;
+ delivery->dirty = false;
+ LL_ADD_PFX(link->head, link->tail, delivery, link_);
+ delivery->work_next = NULL;
+ delivery->work_prev = NULL;
+ delivery->work = false;
+ delivery->tpwork_next = NULL;
+ delivery->tpwork_prev = NULL;
+ delivery->tpwork = false;
+ delivery->bytes = NULL;
+ delivery->size = 0;
+ delivery->capacity = 0;
+ delivery->context = NULL;
+
+ if (!link->current)
+ link->current = delivery;
+
+ pn_work_update(link->session->connection, delivery);
+
+ return delivery;
+}
+
+bool pn_is_current(pn_delivery_t *delivery)
+{
+ pn_link_t *link = delivery->link;
+ return pn_current(link) == delivery;
+}
+
+void pn_delivery_dump(pn_delivery_t *d)
+{
+ char tag[1024];
+ pn_format(tag, 1024, pn_from_binary(d->tag));
+ printf("{tag=%s, local_state=%u, remote_state=%u, local_settled=%u, "
+ "remote_settled=%u, dirty=%u, current=%u, writable=%u, readable=%u, "
+ "work=%u}",
+ tag, d->local_state, d->remote_state, d->local_settled,
+ d->remote_settled, d->dirty, pn_is_current(d), pn_writable(d),
+ pn_readable(d), d->work);
+}
+
+pn_binary_t *pn_delivery_tag(pn_delivery_t *delivery)
+{
+ return delivery->tag;
+}
+
+pn_delivery_t *pn_current(pn_link_t *link)
+{
+ return link->current;
+}
+
+void pn_advance_sender(pn_sender_t *sender)
+{
+ pn_link_t *link = &sender->link;
+ if (link->credit > 0) {
+ link->credit--;
+ pn_add_tpwork(link->current);
+ link->current = link->current->link_next;
+ }
+}
+
+void pn_advance_receiver(pn_receiver_t *receiver)
+{
+ pn_link_t *link = &receiver->link;
+ link->current = link->current->link_next;
+}
+
+bool pn_advance(pn_link_t *link)
+{
+ if (link->current) {
+ pn_delivery_t *prev = link->current;
+ if (link->endpoint.type == SENDER) {
+ pn_advance_sender((pn_sender_t *)link);
+ } else {
+ pn_advance_receiver((pn_receiver_t *)link);
+ }
+ pn_delivery_t *next = link->current;
+ pn_work_update(link->session->connection, prev);
+ if (next) pn_work_update(link->session->connection, next);
+ return prev != next;
+ } else {
+ return false;
+ }
+}
+
+void pn_real_settle(pn_delivery_t *delivery)
+{
+ pn_link_t *link = delivery->link;
+ LL_REMOVE_PFX(link->head, link->tail, delivery, link_);
+ // TODO: what if we settle the current delivery?
+ LL_ADD_PFX(link->settled_head, link->settled_tail, delivery, link_);
+ pn_clear_tag(delivery);
+ delivery->size = 0;
+}
+
+void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
+{
+ pn_delivery_state_t *state = delivery->context;
+ delivery->context = NULL;
+ state->delivery = NULL;
+ pn_real_settle(delivery);
+ pn_delivery_buffer_gc(db);
+}
+
+void pn_settle(pn_delivery_t *delivery)
+{
+ delivery->local_settled = true;
+ pn_add_tpwork(delivery);
+}
+
+typedef enum {IN, OUT} pn_dir_t;
+
+static void pn_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir,
+ char *op, pn_list_t *args, const char *payload,
+ size_t size)
+{
+ pn_format(transport->scratch, SCRATCH, pn_from_list(args));
+ fprintf(stderr, "[%u] %s %s %s", ch, dir == OUT ? "->" : "<-", op,
+ transport->scratch);
+ if (size) {
+ fprintf(stderr, " (%zu) \"", size);
+ for (int i = 0; i < size; i++) {
+ char c = payload[i];
+ if (isprint(c)) {
+ fputc(c, stderr);
+ } else {
+ fprintf(stderr, "\\x%.2x", c);
+ }
+ }
+ fprintf(stderr, "\"\n");
+ } else {
+ fprintf(stderr, "\n");
+ }
+}
+
+void pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...)
+{
+ va_list ap;
+ transport->endpoint.local_error.condition = condition;
+ va_start(ap, fmt);
+ // XXX: result
+ vsnprintf(transport->endpoint.local_error.description, DESCRIPTION, fmt, ap);
+ va_end(ap);
+ transport->endpoint.local_state = CLOSED;
+ fprintf(stderr, "ERROR %s %s\n", condition, transport->endpoint.local_error.description);
+ // XXX: need to write close frame if appropriate
+}
+
+void pn_do_open(pn_transport_t *transport, pn_list_t *args)
+{
+ pn_connection_t *conn = transport->connection;
+ // TODO: store the state
+ conn->endpoint.remote_state = ACTIVE;
+}
+
+void pn_do_begin(pn_transport_t *transport, uint16_t ch, pn_list_t *args)
+{
+ pn_value_t remote_channel = pn_list_get(args, BEGIN_REMOTE_CHANNEL);
+ pn_session_state_t *state;
+ if (remote_channel.type == USHORT) {
+ // XXX: what if session is NULL?
+ state = &transport->sessions[pn_to_uint16(remote_channel)];
+ } else {
+ pn_session_t *ssn = pn_session(transport->connection);
+ state = pn_session_state(transport, ssn);
+ }
+ pn_map_channel(transport, ch, state);
+ state->session->endpoint.remote_state = ACTIVE;
+}
+
+pn_link_state_t *pn_find_link(pn_session_state_t *ssn_state, pn_string_t *name)
+{
+ for (int i = 0; i < ssn_state->session->link_count; i++)
+ {
+ pn_link_t *link = ssn_state->session->links[i];
+ if (!wcsncmp(pn_string_wcs(name), link->name, pn_string_size(name)))
+ {
+ return pn_link_state(ssn_state, link);
+ }
+ }
+ return NULL;
+}
+
+void pn_do_attach(pn_transport_t *transport, uint16_t ch, pn_list_t *args)
+{
+ uint32_t handle = pn_to_uint32(pn_list_get(args, ATTACH_HANDLE));
+ bool is_sender = pn_to_bool(pn_list_get(args, ATTACH_ROLE));
+ pn_string_t *name = pn_to_string(pn_list_get(args, ATTACH_NAME));
+ pn_session_state_t *ssn_state = pn_channel_state(transport, ch);
+ pn_link_state_t *link_state = pn_find_link(ssn_state, name);
+ if (!link_state) {
+ pn_link_t *link;
+ if (is_sender) {
+ link = (pn_link_t *) pn_sender(ssn_state->session, pn_string_wcs(name));
+ } else {
+ link = (pn_link_t *) pn_receiver(ssn_state->session, pn_string_wcs(name));
+ }
+ link_state = pn_link_state(ssn_state, link);
+ }
+
+ pn_map_handle(ssn_state, handle, link_state);
+ link_state->link->endpoint.remote_state = ACTIVE;
+ pn_value_t remote_source = pn_list_get(args, ATTACH_SOURCE);
+ if (remote_source.type == TAG)
+ remote_source = pn_tag_value(pn_to_tag(remote_source));
+ pn_value_t remote_target = pn_list_get(args, ATTACH_TARGET);
+ if (remote_target.type == TAG)
+ remote_target = pn_tag_value(pn_to_tag(remote_target));
+ // XXX: dup src/tgt
+ if (remote_source.type == LIST)
+ link_state->link->remote_source = wcsdup(pn_string_wcs(pn_to_string(pn_list_get(pn_to_list(remote_source), SOURCE_ADDRESS))));
+ if (remote_target.type == LIST)
+ link_state->link->remote_target = wcsdup(pn_string_wcs(pn_to_string(pn_list_get(pn_to_list(remote_target), TARGET_ADDRESS))));
+
+ if (!is_sender) {
+ link_state->delivery_count = pn_to_int32(pn_list_get(args, ATTACH_INITIAL_DELIVERY_COUNT));
+ }
+}
+
+void pn_do_transfer(pn_transport_t *transport, uint16_t channel, pn_list_t *args, const char *payload_bytes, size_t payload_size)
+{
+ // XXX: multi transfer
+
+ pn_session_state_t *ssn_state = pn_channel_state(transport, channel);
+ uint32_t handle = pn_to_uint32(pn_list_get(args, TRANSFER_HANDLE));
+ pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
+ pn_link_t *link = link_state->link;
+ pn_binary_t *tag = pn_to_binary(pn_list_get(args, TRANSFER_DELIVERY_TAG));
+ pn_delivery_t *delivery = pn_delivery(link, tag);
+ pn_delivery_state_t *state = pn_delivery_buffer_push(&ssn_state->incoming, delivery);
+ delivery->context = state;
+ // XXX: need to check that state is not null (i.e. we haven't hit the limit)
+ pn_sequence_t id = pn_to_int32(pn_list_get(args, TRANSFER_DELIVERY_ID));
+ if (id != state->id) {
+ // XXX: signal error somehow
+ }
+
+ PN_ENSURE(delivery->bytes, delivery->capacity, payload_size);
+ memmove(delivery->bytes, payload_bytes, payload_size);
+ delivery->size = payload_size;
+}
+
+void pn_do_flow(pn_transport_t *transport, uint16_t channel, pn_list_t *args)
+{
+ pn_session_state_t *ssn_state = pn_channel_state(transport, channel);
+
+ pn_value_t vhandle = pn_list_get(args, FLOW_HANDLE);
+ if (vhandle.type != EMPTY) {
+ uint32_t handle = pn_to_uint32(vhandle);
+ pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
+ pn_link_t *link = link_state->link;
+ if (link->endpoint.type == SENDER) {
+ pn_value_t delivery_count = pn_list_get(args, FLOW_DELIVERY_COUNT);
+ pn_sequence_t receiver_count;
+ if (delivery_count.type == EMPTY) {
+ // our initial delivery count
+ receiver_count = 0;
+ } else {
+ receiver_count = pn_to_int32(delivery_count);
+ }
+ pn_sequence_t link_credit = pn_to_uint32(pn_list_get(args, FLOW_LINK_CREDIT));
+ link->credit = receiver_count + link_credit - link_state->delivery_count;
+ pn_delivery_t *delivery = pn_current(link);
+ if (delivery) pn_work_update(transport->connection, delivery);
+ }
+ }
+}
+
+void pn_do_disposition(pn_transport_t *transport, uint16_t channel, pn_list_t *args)
+{
+ pn_session_state_t *ssn_state = pn_channel_state(transport, channel);
+ bool role = pn_to_bool(pn_list_get(args, DISPOSITION_ROLE));
+ pn_sequence_t first = pn_to_int32(pn_list_get(args, DISPOSITION_FIRST));
+ pn_sequence_t last = pn_to_int32(pn_list_get(args, DISPOSITION_LAST));
+ //bool settled = pn_to_bool(pn_list_get(args, DISPOSITION_SETTLED));
+ pn_tag_t *dstate = pn_to_tag(pn_list_get(args, DISPOSITION_STATE));
+ uint64_t code = pn_to_uint32(pn_tag_descriptor(dstate));
+ pn_disposition_t disp;
+ switch (code)
+ {
+ case ACCEPTED_CODE:
+ disp = ACCEPTED;
+ break;
+ case REJECTED_CODE:
+ disp = REJECTED;
+ break;
+ default:
+ // XXX
+ fprintf(stderr, "default %lu\n", code);
+ disp = 0;
+ break;
+ }
+
+ pn_delivery_buffer_t *deliveries;
+ if (role) {
+ deliveries = &ssn_state->outgoing;
+ } else {
+ deliveries = &ssn_state->incoming;
+ }
+
+ pn_sequence_t lwm = pn_delivery_buffer_lwm(deliveries);
+
+ for (pn_sequence_t id = first; id <= last; id++) {
+ pn_delivery_state_t *state = pn_delivery_buffer_get(deliveries, id - lwm);
+ pn_delivery_t *delivery = state->delivery;
+ delivery->remote_state = disp;
+ delivery->dirty = true;
+ pn_work_update(transport->connection, delivery);
+ }
+}
+
+void pn_do_detach(pn_transport_t *transport, uint16_t channel, pn_list_t *args)
+{
+ uint32_t handle = pn_to_uint32(pn_list_get(args, DETACH_HANDLE));
+ bool closed = pn_to_bool(pn_list_get(args, DETACH_CLOSED));
+
+ pn_session_state_t *ssn_state = pn_channel_state(transport, channel);
+ if (!ssn_state) {
+ pn_do_error(transport, "amqp:invalid-field", "no such channel: %u", channel);
+ return;
+ }
+ pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
+ pn_link_t *link = link_state->link;
+
+ link_state->remote_handle = -1;
+
+ if (closed)
+ {
+ link->endpoint.remote_state = CLOSED;
+ } else {
+ // TODO: implement
+ }
+}
+
+void pn_do_end(pn_transport_t *transport, uint16_t channel, pn_list_t *args)
+{
+ pn_session_state_t *ssn_state = pn_channel_state(transport, channel);
+ pn_session_t *session = ssn_state->session;
+
+ ssn_state->remote_channel = -1;
+ session->endpoint.remote_state = CLOSED;
+}
+
+void pn_do_close(pn_transport_t *transport, pn_list_t *args)
+{
+ transport->connection->endpoint.remote_state = CLOSED;
+ transport->endpoint.remote_state = CLOSED;
+}
+
+static char *pn_p2op(uint32_t performative)
+{
+ switch (performative)
+ {
+ case OPEN_CODE:
+ return "OPEN";
+ case BEGIN_CODE:
+ return "BEGIN";
+ case ATTACH_CODE:
+ return "ATTACH";
+ case TRANSFER_CODE:
+ return "TRANSFER";
+ case FLOW_CODE:
+ return "FLOW";
+ case DISPOSITION_CODE:
+ return "DISPOSITION";
+ case DETACH_CODE:
+ return "DETACH";
+ case END_CODE:
+ return "END";
+ case CLOSE_CODE:
+ return "CLOSE";
+ default:
+ return "<UNKNOWN>";
+ }
+}
+
+void pn_dispatch(pn_transport_t *transport, uint16_t channel,
+ pn_tag_t *performative, const char *payload_bytes,
+ size_t payload_size)
+{
+ pn_value_t desc = pn_tag_descriptor(performative);
+ pn_list_t *args = pn_to_list(pn_tag_value(performative));
+ pn_value_t cval = pn_map_get(transport->dispatch, desc);
+ uint8_t code = pn_to_uint8(cval);
+
+ pn_trace(transport, channel, IN, pn_p2op(pn_to_uint32(desc)), args,
+ payload_bytes, payload_size);
+
+ switch (code)
+ {
+ case OPEN_IDX:
+ pn_do_open(transport, args);
+ break;
+ case BEGIN_IDX:
+ pn_do_begin(transport, channel, args);
+ break;
+ case ATTACH_IDX:
+ pn_do_attach(transport, channel, args);
+ break;
+ case TRANSFER_IDX:
+ pn_do_transfer(transport, channel, args, payload_bytes, payload_size);
+ break;
+ case FLOW_IDX:
+ pn_do_flow(transport, channel, args);
+ break;
+ case DISPOSITION_IDX:
+ pn_do_disposition(transport, channel, args);
+ break;
+ case DETACH_IDX:
+ pn_do_detach(transport, channel, args);
+ break;
+ case END_IDX:
+ pn_do_end(transport, channel, args);
+ break;
+ case CLOSE_IDX:
+ pn_do_close(transport, args);
+ break;
+ }
+}
+
+ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available)
+{
+ if (transport->endpoint.local_state == CLOSED) {
+ return EOS;
+ }
+
+ if (transport->endpoint.remote_state == CLOSED) {
+ pn_do_error(transport, "amqp:connection:framing-error", "data after close");
+ return EOS;
+ }
+
+ size_t read = 0;
+ while (true) {
+ pn_frame_t frame;
+ size_t n = pn_read_frame(&frame, bytes + read, available);
+ if (n) {
+ pn_value_t performative;
+ ssize_t e = pn_decode(&performative, frame.payload, frame.size);
+ if (e < 0) {
+ fprintf(stderr, "Error decoding frame: %zi\n", e);
+ pn_format(transport->scratch, SCRATCH, pn_value("z", frame.size, frame.payload));
+ fprintf(stderr, "%s\n", transport->scratch);
+ return e;
+ }
+
+ pn_tag_t *perf = pn_to_tag(performative);
+ pn_dispatch(transport, frame.channel, perf, frame.payload + e, frame.size - e);
+ pn_visit(performative, pn_free_value);
+
+ available -= n;
+ read += n;
+ } else {
+ break;
+ }
+ }
+
+ return read;
+}
+
+void pn_init_frame(pn_transport_t *transport)
+{
+ pn_list_clear(transport->args);
+ transport->payload_bytes = NULL;
+ transport->payload_size = 0;
+}
+
+void pn_field(pn_transport_t *transport, int index, pn_value_t arg)
+{
+ int n = pn_list_size(transport->args);
+ if (index >= n)
+ pn_list_fill(transport->args, EMPTY_VALUE, index - n + 1);
+ pn_list_set(transport->args, index, arg);
+}
+
+void pn_append_payload(pn_transport_t *transport, const char *data, size_t size)
+{
+ transport->payload_bytes = data;
+ transport->payload_size = size;
+}
+
+#define BUF_SIZE (1024*1024)
+
+void pn_post_frame(pn_transport_t *transport, uint16_t ch, uint32_t performative)
+{
+ pn_tag_t tag = { .descriptor = pn_ulong(performative),
+ .value = pn_from_list(transport->args) };
+ pn_frame_t frame = {0};
+ char bytes[pn_encode_sizeof(pn_from_tag(&tag)) + transport->payload_size];
+ pn_trace(transport, ch, OUT, pn_p2op(performative), transport->args,
+ transport->payload_bytes, transport->payload_size);
+ size_t size = pn_encode(pn_from_tag(&tag), bytes);
+ for (int i = 0; i < pn_list_size(transport->args); i++)
+ pn_visit(pn_list_get(transport->args, i), pn_free_value);
+ if (transport->payload_size) {
+ memmove(bytes + size, transport->payload_bytes, transport->payload_size);
+ size += transport->payload_size;
+ transport->payload_bytes = NULL;
+ transport->payload_size = 0;
+ }
+ frame.channel = ch;
+ frame.payload = bytes;
+ frame.size = size;
+ size_t n;
+ while (!(n = pn_write_frame(transport->output + transport->available,
+ transport->capacity - transport->available, frame))) {
+ transport->capacity *= 2;
+ transport->output = realloc(transport->output, transport->capacity);
+ }
+ transport->available += n;
+}
+
+void pn_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == CONNECTION)
+ {
+ if (endpoint->local_state != UNINIT && !transport->open_sent)
+ {
+ pn_init_frame(transport);
+ /*if (container_id)
+ pn_field(eng, OPEN_CONTAINER_ID, pn_value("S", container_id));*/
+ /*if (hostname)
+ pn_field(eng, OPEN_HOSTNAME, pn_value("S", hostname));*/
+ pn_post_frame(transport, 0, OPEN_CODE);
+ transport->open_sent = true;
+ }
+ }
+}
+
+void pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == SESSION)
+ {
+ pn_session_t *ssn = (pn_session_t *) endpoint;
+ pn_session_state_t *state = pn_session_state(transport, ssn);
+ if (endpoint->local_state != UNINIT && state->local_channel == (uint16_t) -1)
+ {
+ pn_init_frame(transport);
+ if ((int16_t) state->remote_channel >= 0)
+ pn_field(transport, BEGIN_REMOTE_CHANNEL, pn_value("H", state->remote_channel));
+ pn_field(transport, BEGIN_NEXT_OUTGOING_ID, pn_value("I", state->outgoing.next));
+ pn_field(transport, BEGIN_INCOMING_WINDOW, pn_value("I", state->incoming.capacity));
+ pn_field(transport, BEGIN_OUTGOING_WINDOW, pn_value("I", state->outgoing.capacity));
+ // XXX: we use the session id as the outgoing channel, we depend
+ // on this for looking up via remote channel
+ uint16_t channel = ssn->id;
+ pn_post_frame(transport, channel, BEGIN_CODE);
+ state->local_channel = channel;
+ }
+ }
+}
+
+void pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == SENDER || endpoint->type == RECEIVER)
+ {
+ pn_link_t *link = (pn_link_t *) endpoint;
+ pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+ pn_link_state_t *state = pn_link_state(ssn_state, link);
+ if (endpoint->local_state != UNINIT && state->local_handle == (uint32_t) -1)
+ {
+ pn_init_frame(transport);
+ pn_field(transport, ATTACH_ROLE, pn_boolean(endpoint->type == RECEIVER));
+ pn_field(transport, ATTACH_NAME, pn_value("S", link->name));
+ // XXX
+ state->local_handle = link->id;
+ pn_field(transport, ATTACH_HANDLE, pn_value("I", state->local_handle));
+ // XXX
+ pn_field(transport, ATTACH_INITIAL_DELIVERY_COUNT, pn_value("I", 0));
+ if (link->local_source)
+ pn_field(transport, ATTACH_SOURCE, pn_value("B([S])", SOURCE_CODE,
+ link->local_source));
+ if (link->local_target)
+ pn_field(transport, ATTACH_TARGET, pn_value("B([S])", TARGET_CODE,
+ link->local_target));
+ pn_post_frame(transport, ssn_state->local_channel, ATTACH_CODE);
+ }
+ }
+}
+
+void pn_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == RECEIVER && endpoint->local_state == ACTIVE)
+ {
+ pn_receiver_t *rcv = (pn_receiver_t *) endpoint;
+ if (rcv->credits) {
+ pn_session_state_t *ssn_state = pn_session_state(transport, rcv->link.session);
+ pn_link_state_t *state = pn_link_state(ssn_state, &rcv->link);
+ state->link_credit += rcv->credits;
+ rcv->credits = 0;
+
+ pn_init_frame(transport);
+ //pn_field(transport, FLOW_NEXT_INCOMING_ID, pn_value("I", ssn_state->next_incoming_id));
+ pn_field(transport, FLOW_INCOMING_WINDOW, pn_value("I", ssn_state->incoming.capacity));
+ pn_field(transport, FLOW_NEXT_OUTGOING_ID, pn_value("I", ssn_state->outgoing.next));
+ pn_field(transport, FLOW_OUTGOING_WINDOW, pn_value("I", ssn_state->outgoing.capacity));
+ pn_field(transport, FLOW_HANDLE, pn_value("I", state->local_handle));
+ //pn_field(transport, FLOW_DELIVERY_COUNT, pn_value("I", delivery_count));
+ pn_field(transport, FLOW_LINK_CREDIT, pn_value("I", state->link_credit));
+ pn_post_frame(transport, ssn_state->local_channel, FLOW_CODE);
+ }
+ }
+}
+
+void pn_post_disp(pn_transport_t *transport, pn_delivery_t *delivery)
+{
+ pn_link_t *link = delivery->link;
+ pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+ // XXX: check for null state
+ pn_delivery_state_t *state = delivery->context;
+ pn_init_frame(transport);
+ pn_field(transport, DISPOSITION_ROLE, pn_boolean(link->endpoint.type == RECEIVER));
+ pn_field(transport, DISPOSITION_FIRST, pn_uint(state->id));
+ pn_field(transport, DISPOSITION_LAST, pn_uint(state->id));
+ // XXX
+ pn_field(transport, DISPOSITION_SETTLED, pn_boolean(delivery->local_settled));
+ uint64_t code;
+ switch(delivery->local_state) {
+ case ACCEPTED:
+ code = ACCEPTED_CODE;
+ break;
+ case RELEASED:
+ code = RELEASED_CODE;
+ break;
+ //TODO: rejected and modified (both take extra data which may need to be passed through somehow) e.g. change from enum to discriminated union?
+ default:
+ code = 0;
+ }
+ if (code)
+ pn_field(transport, DISPOSITION_STATE, pn_value("L([])", code));
+ //pn_field(transport, DISPOSITION_BATCHABLE, pn_boolean(batchable));
+ pn_post_frame(transport, ssn_state->local_channel, DISPOSITION_CODE);
+}
+
+void pn_process_disp_receiver(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == CONNECTION && !transport->close_sent)
+ {
+ pn_connection_t *conn = (pn_connection_t *) endpoint;
+ pn_delivery_t *delivery = conn->tpwork_head;
+ while (delivery)
+ {
+ pn_link_t *link = delivery->link;
+ if (link->endpoint.type == RECEIVER) {
+ // XXX: need to prevent duplicate disposition sending
+ pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+ if ((int16_t) ssn_state->local_channel >= 0) {
+ pn_post_disp(transport, delivery);
+ }
+
+ if (delivery->local_settled) {
+ pn_full_settle(&ssn_state->incoming, delivery);
+ }
+ }
+ delivery = delivery->tpwork_next;
+ }
+ }
+}
+
+void pn_process_msg_data(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == CONNECTION && !transport->close_sent)
+ {
+ pn_connection_t *conn = (pn_connection_t *) endpoint;
+ pn_delivery_t *delivery = conn->tpwork_head;
+ while (delivery)
+ {
+ pn_link_t *link = delivery->link;
+ if (link->endpoint.type == SENDER) {
+ pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+ pn_link_state_t *link_state = pn_link_state(ssn_state, link);
+ pn_delivery_state_t *state = delivery->context;
+ if (!state) {
+ state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
+ delivery->context = state;
+ }
+ if (!state->sent && (int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
+ pn_init_frame(transport);
+ pn_field(transport, TRANSFER_HANDLE, pn_value("I", link_state->local_handle));
+ pn_field(transport, TRANSFER_DELIVERY_ID, pn_value("I", state->id));
+ pn_field(transport, TRANSFER_DELIVERY_TAG, pn_from_binary(pn_binary_dup(delivery->tag)));
+ pn_field(transport, TRANSFER_MESSAGE_FORMAT, pn_value("I", 0));
+ if (delivery->bytes) {
+ pn_append_payload(transport, delivery->bytes, delivery->size);
+ delivery->size = 0;
+ }
+ pn_post_frame(transport, ssn_state->local_channel, TRANSFER_CODE);
+ state->sent = true;
+ }
+ }
+ delivery = delivery->tpwork_next;
+ }
+ }
+}
+
+void pn_process_disp_sender(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == CONNECTION && !transport->close_sent)
+ {
+ pn_connection_t *conn = (pn_connection_t *) endpoint;
+ pn_delivery_t *delivery = conn->tpwork_head;
+ while (delivery)
+ {
+ pn_link_t *link = delivery->link;
+ if (link->endpoint.type == SENDER) {
+ // XXX: need to prevent duplicate disposition sending
+ pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+ /*if ((int16_t) ssn_state->local_channel >= 0) {
+ pn_post_disp(transport, delivery);
+ }*/
+
+ if (delivery->local_settled) {
+ pn_full_settle(&ssn_state->outgoing, delivery);
+ }
+ }
+ delivery = delivery->tpwork_next;
+ }
+ }
+}
+
+void pn_process_flow_sender(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ // TODO: implement
+}
+
+void pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == SENDER || endpoint->type == RECEIVER)
+ {
+ pn_link_t *link = (pn_link_t *) endpoint;
+ pn_session_t *session = link->session;
+ pn_session_state_t *ssn_state = pn_session_state(transport, session);
+ pn_link_state_t *state = pn_link_state(ssn_state, link);
+ if (endpoint->local_state == CLOSED && (int32_t) state->local_handle >= 0) {
+ pn_init_frame(transport);
+ pn_field(transport, DETACH_HANDLE, pn_value("I", state->local_handle));
+ pn_field(transport, DETACH_CLOSED, pn_boolean(true));
+ /* XXX: error
+ if (condition)
+ // XXX: symbol
+ pn_engine_field(eng, DETACH_ERROR, pn_value("B([zS])", ERROR_CODE, condition, description)); */
+ pn_post_frame(transport, ssn_state->local_channel, DETACH_CODE);
+ state->local_handle = -2;
+ }
+ }
+}
+
+void pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == SESSION)
+ {
+ pn_session_t *session = (pn_session_t *) endpoint;
+ pn_session_state_t *state = pn_session_state(transport, session);
+ if (endpoint->local_state == CLOSED && (int16_t) state->local_channel >= 0)
+ {
+ pn_init_frame(transport);
+ /*if (condition)
+ // XXX: symbol
+ pn_engine_field(eng, DETACH_ERROR, pn_value("B([zS])", ERROR_CODE, condition, description));*/
+ pn_post_frame(transport, state->local_channel, END_CODE);
+ state->local_channel = -2;
+ }
+ }
+}
+
+void pn_process_conn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == CONNECTION)
+ {
+ if (endpoint->local_state == CLOSED && !transport->close_sent) {
+ pn_init_frame(transport);
+ /*if (condition)
+ // XXX: symbol
+ pn_field(eng, CLOSE_ERROR, pn_value("B([zS])", ERROR_CODE, condition, description));*/
+ pn_post_frame(transport, 0, CLOSE_CODE);
+ transport->close_sent = true;
+ }
+ }
+}
+
+void pn_clear_phase(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ pn_clear_modified(transport->connection, endpoint);
+}
+
+void pn_phase(pn_transport_t *transport, void (*phase)(pn_transport_t *, pn_endpoint_t *))
+{
+ pn_connection_t *conn = transport->connection;
+ pn_endpoint_t *endpoint = conn->transport_head;
+ while (endpoint)
+ {
+ phase(transport, endpoint);
+ endpoint = endpoint->transport_next;
+ }
+}
+
+void pn_process(pn_transport_t *transport)
+{
+ pn_phase(transport, pn_process_conn_setup);
+ pn_phase(transport, pn_process_ssn_setup);
+ pn_phase(transport, pn_process_link_setup);
+ pn_phase(transport, pn_process_flow_receiver);
+ pn_phase(transport, pn_process_disp_receiver);
+ pn_phase(transport, pn_process_msg_data);
+ pn_phase(transport, pn_process_disp_sender);
+ pn_phase(transport, pn_process_flow_sender);
+ pn_phase(transport, pn_process_link_teardown);
+ pn_phase(transport, pn_process_ssn_teardown);
+ pn_phase(transport, pn_process_conn_teardown);
+ pn_phase(transport, pn_clear_phase);
+
+ pn_delivery_t *delivery = transport->connection->tpwork_head;
+ while (delivery) {
+ pn_clear_tpwork(delivery);
+ delivery = delivery->tpwork_next;
+ }
+}
+
+ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+ pn_process(transport);
+
+ if (!transport->available && transport->endpoint.local_state == CLOSED) {
+ return EOS;
+ }
+
+ int n = transport->available < size ? transport->available : size;
+ memmove(bytes, transport->output, n);
+ memmove(transport->output, transport->output + n, transport->available - n);
+ transport->available -= n;
+ // XXX: need to check endpoint for errors
+ return n;
+}
+
+ssize_t pn_send(pn_sender_t *sender, const char *bytes, size_t n)
+{
+ pn_delivery_t *current = pn_current(&sender->link);
+ if (!current) return -1;
+ if (current->bytes) return 0;
+ PN_ENSURE(current->bytes, current->capacity, current->size + n);
+ memmove(current->bytes + current->size, bytes, n);
+ current->size = +n;
+ pn_add_tpwork(current);
+ return n;
+}
+
+ssize_t pn_recv(pn_receiver_t *receiver, char *bytes, size_t n)
+{
+ pn_link_t *link = &receiver->link;
+ pn_delivery_t *delivery = link->current;
+ if (delivery) {
+ if (delivery->size) {
+ size_t size = n > delivery->size ? delivery->size : n;
+ memmove(bytes, delivery->bytes, size);
+ memmove(bytes, bytes + size, delivery->size - size);
+ delivery->size -= size;
+ return size;
+ } else {
+ return EOM;
+ }
+ } else {
+ // XXX: ?
+ return EOM;
+ }
+}
+
+void pn_flow(pn_receiver_t *receiver, int credits)
+{
+ receiver->credits += credits;
+ pn_modified(receiver->link.session->connection, &receiver->link.endpoint);
+}
+
+time_t pn_tick(pn_transport_t *engine, time_t now)
+{
+ return 0;
+}
+
+pn_link_t *pn_link(pn_delivery_t *delivery)
+{
+ return delivery->link;
+}
+
+int pn_local_disp(pn_delivery_t *delivery)
+{
+ return delivery->local_state;
+}
+
+int pn_remote_disp(pn_delivery_t *delivery)
+{
+ return delivery->remote_state;
+}
+
+bool pn_dirty(pn_delivery_t *delivery)
+{
+ return delivery->dirty;
+}
+
+void pn_clean(pn_delivery_t *delivery)
+{
+ delivery->dirty = false;
+ pn_work_update(delivery->link->session->connection, delivery);
+}
+
+void pn_disposition(pn_delivery_t *delivery, pn_disposition_t disposition)
+{
+ delivery->local_state = disposition;
+ pn_add_tpwork(delivery);
+}
+
+bool pn_writable(pn_delivery_t *delivery)
+{
+ pn_link_t *link = delivery->link;
+ return link->endpoint.type == SENDER && pn_is_current(delivery) && link->credit > 0;
+}
+
+bool pn_readable(pn_delivery_t *delivery)
+{
+ pn_link_t *link = delivery->link;
+ return link->endpoint.type == RECEIVER && pn_is_current(delivery);
+}
Added: qpid/proton/proton-c/src/framing/framing.c
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/framing/framing.c?rev=1298498&view=auto
==============================================================================
--- qpid/proton/proton-c/src/framing/framing.c (added)
+++ qpid/proton/proton-c/src/framing/framing.c Thu Mar 8 18:33:46 2012
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <arpa/inet.h>
+#include <proton/framing.h>
+
+size_t pn_read_frame(pn_frame_t *frame, char *bytes, size_t available)
+{
+ if (available >= AMQP_HEADER_SIZE) {
+ size_t size = htonl(*((uint32_t *) bytes));
+ if (available >= size)
+ {
+ int doff = bytes[4]*4;
+ frame->size = size - doff;
+ frame->ex_size = doff - AMQP_HEADER_SIZE;
+ frame->type = bytes[5];
+ frame->channel = htons(*((uint16_t *) (bytes + 6)));
+
+ frame->extended = bytes + AMQP_HEADER_SIZE;
+ frame->payload = bytes + doff;
+ return size;
+ }
+ }
+
+ return 0;
+}
+
+size_t pn_write_frame(char *bytes, size_t available, pn_frame_t frame)
+{
+ size_t size = AMQP_HEADER_SIZE + frame.ex_size + frame.size;
+ if (size <= available)
+ {
+ *((uint32_t *) bytes) = ntohl(size);
+ int doff = (frame.ex_size + AMQP_HEADER_SIZE - 1)/4 + 1;
+ bytes[4] = doff;
+ bytes[5] = frame.type;
+ *((uint16_t *) (bytes + 6)) = ntohs(frame.channel);
+
+ memmove(bytes + AMQP_HEADER_SIZE, frame.extended, frame.ex_size);
+ memmove(bytes + 4*doff, frame.payload, frame.size);
+ return size;
+ } else {
+ return 0;
+ }
+}
Added: qpid/proton/proton-c/src/messaging.xml
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/messaging.xml?rev=1298498&view=auto
==============================================================================
--- qpid/proton/proton-c/src/messaging.xml (added)
+++ qpid/proton/proton-c/src/messaging.xml Thu Mar 8 18:33:46 2012
@@ -0,0 +1,168 @@
+<?xml version="1.0"?>
+
+<!--
+
+Copyright Bank of America, N.A., Barclays Bank PLC, Cisco Systems, Credit
+Suisse, Deutsche Boerse Systems, Goldman Sachs, HCL Technologies Ltd, INETCO
+Systems Limited, Informatica Corporation, JPMorgan Chase Bank Inc. N.A,
+Microsoft Corporation, my-Channels, Novell, Progress Software, Red Hat Inc.,
+Software AG, Solace Systems Inc., StormMQ Ltd., Tervela Inc., TWIST Process
+Innovations Ltd, VMware Inc. and WS02 Inc. 2006-2011. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+1. Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+3. The name of the author may not be used to endorse or promote products
+derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+-->
+
+<amqp name="messaging" xmlns="http://www.amqp.org/schema/amqp.xsd">
+ <section name="message-format">
+ <type name="header" class="composite" source="list" provides="section">
+ <descriptor name="amqp:header:list" code="0x00000000:0x00000070"/>
+ <field name="durable" type="boolean"/>
+ <field name="priority" type="ubyte"/>
+ <field name="ttl" type="milliseconds"/>
+ <field name="first-acquirer" type="boolean"/>
+ <field name="delivery-count" type="uint"/>
+ </type>
+ <type name="delivery-annotations" class="restricted" source="annotations" provides="section">
+ <descriptor name="amqp:delivery-annotations:map" code="0x00000000:0x00000071"/>
+ </type>
+ <type name="message-annotations" class="restricted" source="annotations" provides="section">
+ <descriptor name="amqp:message-annotations:map" code="0x00000000:0x00000072"/>
+ </type>
+ <type name="properties" class="composite" source="list" provides="section">
+ <descriptor name="amqp:properties:list" code="0x00000000:0x00000073"/>
+ <field name="message-id" type="*" requires="message-id"/>
+ <field name="user-id" type="binary"/>
+ <field name="to" type="*" requires="address"/>
+ <field name="subject" type="string"/>
+ <field name="reply-to" type="*" requires="address"/>
+ <field name="correlation-id" type="*" requires="message-id"/>
+ <field name="content-type" type="symbol"/>
+ <field name="content-encoding" type="symbol"/>
+ <field name="absolute-expiry-time" type="timestamp"/>
+ <field name="creation-time" type="timestamp"/>
+ <field name="group-id" type="string"/>
+ <field name="group-sequence" type="sequence-no"/>
+ <field name="reply-to-group-id" type="string"/>
+ </type>
+ <type name="application-properties" class="restricted" source="map" provides="section">
+ <descriptor name="amqp:application-properties:map" code="0x00000000:0x00000074"/>
+ </type>
+ <type name="data" class="restricted" source="binary" provides="section">
+ <descriptor name="amqp:data:binary" code="0x00000000:0x00000075"/>
+ </type>
+ <type name="amqp-sequence" class="restricted" source="list" provides="section">
+ <descriptor name="amqp:amqp-sequence:list" code="0x00000000:0x00000076"/>
+ </type>
+ <type name="amqp-value" class="restricted" source="*" provides="section">
+ <descriptor name="amqp:amqp-value:*" code="0x00000000:0x00000077"/>
+ </type>
+ <type name="footer" class="restricted" source="annotations" provides="section">
+ <descriptor name="amqp:footer:map" code="0x00000000:0x00000078"/>
+ </type>
+ <type name="annotations" class="restricted" source="map"/>
+ <type name="message-id-ulong" class="restricted" source="ulong" provides="message-id"/>
+ <type name="message-id-uuid" class="restricted" source="uuid" provides="message-id"/>
+ <type name="message-id-binary" class="restricted" source="binary" provides="message-id"/>
+ <type name="message-id-string" class="restricted" source="string" provides="message-id"/>
+ <type name="address-string" class="restricted" source="string" provides="address"/>
+ <definition name="MESSAGE-FORMAT" value="0"/>
+ </section>
+ <section name="delivery-state">
+ <type name="received" class="composite" source="list" provides="delivery-state">
+ <descriptor name="amqp:received:list" code="0x00000000:0x00000023"/>
+ <field name="section-number" type="uint"/>
+ <field name="section-offset" type="ulong"/>
+ </type>
+ <type name="accepted" class="composite" source="list" provides="delivery-state, outcome">
+ <descriptor name="amqp:accepted:list" code="0x00000000:0x00000024"/>
+ </type>
+ <type name="rejected" class="composite" source="list" provides="delivery-state, outcome">
+ <descriptor name="amqp:rejected:list" code="0x00000000:0x00000025"/>
+ <field name="error" type="error"/>
+ </type>
+ <type name="released" class="composite" source="list" provides="delivery-state, outcome">
+ <descriptor name="amqp:released:list" code="0x00000000:0x00000026"/>
+ </type>
+ <type name="modified" class="composite" source="list" provides="delivery-state, outcome">
+ <descriptor name="amqp:modified:list" code="0x00000000:0x00000027"/>
+ <field name="delivery-failed" type="boolean"/>
+ <field name="undeliverable-here" type="boolean"/>
+ <field name="message-annotations" type="fields"/>
+ </type>
+ </section>
+ <section name="addressing">
+ <type name="source" class="composite" source="list" provides="source">
+ <descriptor name="amqp:source:list" code="0x00000000:0x00000028"/>
+ <field name="address" type="*" requires="address"/>
+ <field name="durable" type="terminus-durability" default="none"/>
+ <field name="expiry-policy" type="terminus-expiry-policy" default="session-end"/>
+ <field name="timeout" type="seconds" default="0"/>
+ <field name="dynamic" type="boolean" default="false"/>
+ <field name="dynamic-node-properties" type="node-properties"/>
+ <field name="distribution-mode" type="symbol" requires="distribution-mode"/>
+ <field name="filter" type="filter-set"/>
+ <field name="default-outcome" type="*" requires="outcome"/>
+ <field name="outcomes" type="symbol" multiple="true"/>
+ <field name="capabilities" type="symbol" multiple="true"/>
+ </type>
+ <type name="target" class="composite" source="list" provides="target">
+ <descriptor name="amqp:target:list" code="0x00000000:0x00000029"/>
+ <field name="address" type="*" requires="address"/>
+ <field name="durable" type="terminus-durability" default="none"/>
+ <field name="expiry-policy" type="terminus-expiry-policy" default="session-end"/>
+ <field name="timeout" type="seconds" default="0"/>
+ <field name="dynamic" type="boolean" default="false"/>
+ <field name="dynamic-node-properties" type="node-properties"/>
+ <field name="capabilities" type="symbol" multiple="true"/>
+ </type>
+ <type name="terminus-durability" class="restricted" source="uint">
+ <choice name="none" value="0"/>
+ <choice name="configuration" value="1"/>
+ <choice name="unsettled-state" value="2"/>
+ </type>
+ <type name="terminus-expiry-policy" class="restricted" source="symbol">
+ <choice name="link-detach" value="link-detach"/>
+ <choice name="session-end" value="session-end"/>
+ <choice name="connection-close" value="connection-close"/>
+ <choice name="never" value="never"/>
+ </type>
+ <type name="std-dist-mode" class="restricted" source="symbol" provides="distribution-mode">
+ <choice name="move" value="move"/>
+ <choice name="copy" value="copy"/>
+ </type>
+ <type name="filter-set" class="restricted" source="map"/>
+ <type name="node-properties" class="restricted" source="fields"/>
+ <type name="delete-on-close" class="composite" source="list" provides="lifetime-policy">
+ <descriptor name="amqp:delete-on-close:list" code="0x00000000:0x0000002b"/>
+ </type>
+ <type name="delete-on-no-links" class="composite" source="list" provides="lifetime-policy">
+ <descriptor name="amqp:delete-on-no-links:list" code="0x00000000:0x0000002c"/>
+ </type>
+ <type name="delete-on-no-messages" class="composite" source="list" provides="lifetime-policy">
+ <descriptor name="amqp:delete-on-no-messages:list" code="0x00000000:0x0000002d"/>
+ </type>
+ <type name="delete-on-no-links-or-messages" class="composite" source="list" provides="lifetime-policy">
+ <descriptor name="amqp:delete-on-no-links-or-messages:list" code="0x00000000:0x0000002e"/>
+ </type>
+ </section>
+</amqp>
Added: qpid/proton/proton-c/src/protocol.h.py
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/protocol.h.py?rev=1298498&view=auto
==============================================================================
--- qpid/proton/proton-c/src/protocol.h.py (added)
+++ qpid/proton/proton-c/src/protocol.h.py Thu Mar 8 18:33:46 2012
@@ -0,0 +1,47 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from protocol import *
+
+print "/* generated */"
+print "#ifndef _PROTON_PROTOCOL_H"
+print "#define _PROTON_PROTOCOL_H 1"
+print
+
+for type in TYPES:
+ fidx = 0
+ for f in type.query["field"]:
+ print "#define %s_%s (%s)" % (field_kw(type), field_kw(f), fidx)
+ fidx += 1
+
+idx = 0
+
+for type in TYPES:
+ desc = type["descriptor"]
+ name = type["@name"].upper().replace("-", "_")
+ print "#define %s_SYM (\"%s\")" % (name, desc["@name"])
+ hi, lo = [int(x, 0) for x in desc["@code"].split(":")]
+ code = (hi << 32) + lo
+ print "#define %s_CODE (%s)" % (name, code)
+ print "#define %s_IDX (%s)" % (name, idx)
+ idx += 1
+
+print
+print "#endif /* protocol.h */"
Added: qpid/proton/proton-c/src/protocol.py
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/protocol.py?rev=1298498&view=auto
==============================================================================
--- qpid/proton/proton-c/src/protocol.py (added)
+++ qpid/proton/proton-c/src/protocol.py Thu Mar 8 18:33:46 2012
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import mllib, os, sys
+
+doc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "transport.xml"))
+mdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "messaging.xml"))
+
+def eq(attr, value):
+ return lambda nd: nd[attr] == value
+
+TYPES = doc.query["amqp/section/type", eq("@class", "composite")] + \
+ mdoc.query["amqp/section/type", eq("@class", "composite")]
+RESTRICTIONS = {}
+COMPOSITES = {}
+
+for type in doc.query["amqp/section/type"] + mdoc.query["amqp/section/type"]:
+ source = type["@source"]
+ if source:
+ RESTRICTIONS[type["@name"]] = source
+ if type["@class"] == "composite":
+ COMPOSITES[type["@name"]] = type
+
+def resolve(name):
+ if name in RESTRICTIONS:
+ return resolve(RESTRICTIONS[name])
+ else:
+ return name
+
+TYPEMAP = {
+ "boolean": ("bool", "", ""),
+ "binary": ("pn_binary_t", "*", ""),
+ "string": ("wchar_t", "*", ""),
+ "symbol": ("char", "*", ""),
+ "ubyte": ("uint8_t", "", ""),
+ "ushort": ("uint16_t", "", ""),
+ "uint": ("uint32_t", "", ""),
+ "ulong": ("uint64_t", "", ""),
+ "timestamp": ("uint64_t", "", ""),
+ "list": ("pn_list_t", "*", ""),
+ "map": ("pn_map_t", "*", ""),
+ "box": ("pn_box_t", "*", ""),
+ "*": ("pn_object_t", "*", "")
+ }
+
+CONSTRUCTORS = {
+ "boolean": "boolean",
+ "string": "string",
+ "symbol": "symbol",
+ "ubyte": "ubyte",
+ "ushort": "ushort",
+ "uint": "uint",
+ "ulong": "ulong",
+ "timestamp": "ulong"
+ }
+
+NULLABLE = set(["string", "symbol"])
+
+def fname(field):
+ return field["@name"].replace("-", "_")
+
+def tname(t):
+ return t["@name"].replace("-", "_")
+
+def multi(f):
+ return f["@multiple"] == "true"
+
+def ftype(field):
+ if multi(field):
+ return "list"
+ elif field["@type"] in COMPOSITES:
+ return "box"
+ else:
+ return resolve(field["@type"]).replace("-", "_")
+
+def fconstruct(field, expr):
+ type = ftype(field)
+ if type in CONSTRUCTORS:
+ result = "pn_%s(mem, %s)" % (CONSTRUCTORS[type], expr)
+ if type in NULLABLE:
+ result = "%s ? %s : NULL" % (expr, result)
+ else:
+ result = expr
+ if multi(field):
+ result = "pn_box(mem, pn_boolean(mem, true), %s)" % result
+ return result
+
+def declaration(field):
+ name = fname(field)
+ type = ftype(field)
+ t, pre, post = TYPEMAP.get(type, (type, "", ""))
+ return t, "%s%s%s" % (pre, name, post)
+
+def field_kw(field):
+ return fname(field).upper()
Added: qpid/proton/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/proton.c?rev=1298498&view=auto
==============================================================================
--- qpid/proton/proton-c/src/proton.c (added)
+++ qpid/proton/proton-c/src/proton.c Thu Mar 8 18:33:46 2012
@@ -0,0 +1,315 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <string.h>
+#include <proton/driver.h>
+#include <proton/value.h>
+
+void print(pn_value_t value)
+{
+ char buf[1024];
+ char *pos = buf;
+ pn_format_value(&pos, buf + 1024, &value, 1);
+ *pos = '\0';
+ printf("%s\n", buf);
+}
+
+int value(int argc, char **argv)
+{
+ pn_value_t v[1024];
+ int count = pn_scan(v, "niIlLISz[iii{SSSi}]i([iiiiz])@i[iii]{SiSiSi}", -3, 3, -123456789101112, 123456719101112, 3,
+ L"this is a string", (size_t) 16, "this is binary\x00\x01",
+ 1, 2, 3, L"key", L"value", L"one", 1,
+ 1, 2, 3, 4, 5, 7, "binary",
+ 1, 2, 3,
+ L"one", 1, L"two", 2, L"three", 3);
+
+ pn_list_t *list = pn_to_list(v[8]);
+ pn_map_t *map = pn_to_map(pn_list_get(list, 3));
+ print(pn_list_get(list, 3));
+ printf("POP: ");
+ print(pn_map_pop(map, pn_value("S", L"key")));
+
+ printf("scanned %i values\n", count);
+ for (int i = 0; i < count; i++) {
+ printf("value %.2i [%zi]: ", i, pn_encode_sizeof(v[i])); print(v[i]);
+ }
+
+ pn_list_t *l = pn_list(1024);
+ pn_list_extend(l, "SIi[iii]", L"One", 2, -3, 4, 5, 6);
+ printf("list [%zi]: ", pn_encode_sizeof_list(l)); print(pn_from_list(l));
+
+ for (int i = 0; i < count; i++)
+ {
+ char buf[pn_encode_sizeof(v[i])];
+ size_t size = pn_encode(v[i], buf);
+ pn_value_t value;
+ size_t read = pn_decode(&value, buf, size);
+ printf("read=%zi: ", read); print(value);
+ }
+
+ return 0;
+}
+
+struct server_context {
+ int count;
+};
+
+void server_callback(pn_connection_t *conn, void *context)
+{
+ struct server_context *ctx = context;
+ char tagstr[1024];
+ char msg[1024];
+
+ pn_endpoint_t *endpoint = pn_endpoint_head(conn, UNINIT, ACTIVE);
+ while (endpoint)
+ {
+ switch (pn_endpoint_type(endpoint))
+ {
+ case CONNECTION:
+ case SESSION:
+ if (pn_remote_state(endpoint) != UNINIT)
+ pn_open(endpoint);
+ break;
+ case SENDER:
+ case RECEIVER:
+ {
+ pn_link_t *link = (pn_link_t *) endpoint;
+ if (pn_remote_state(endpoint) != UNINIT) {
+ printf("%ls, %ls\n", pn_remote_source(link), pn_remote_target(link));
+ pn_set_source(link, pn_remote_source(link));
+ pn_set_target(link, pn_remote_target(link));
+ pn_open(endpoint);
+ if (pn_endpoint_type(endpoint) == RECEIVER) {
+ pn_flow((pn_receiver_t *) endpoint, 100);
+ } else {
+ pn_binary_t *tag = pn_binary("blah", 4);
+ pn_delivery(link, tag);
+ pn_free_binary(tag);
+ }
+ }
+ }
+ break;
+ case TRANSPORT:
+ break;
+ }
+
+ endpoint = pn_endpoint_next(endpoint, UNINIT, ACTIVE);
+ }
+
+ pn_delivery_t *delivery = pn_work_head(conn);
+ while (delivery)
+ {
+ pn_binary_t *tag = pn_delivery_tag(delivery);
+ pn_format(tagstr, 1024, pn_from_binary(tag));
+ pn_link_t *link = pn_link(delivery);
+ if (pn_readable(delivery)) {
+ printf("received delivery: %s\n", tagstr);
+ pn_receiver_t *receiver = (pn_receiver_t *) link;
+ printf(" payload = \"");
+ while (true) {
+ ssize_t n = pn_recv(receiver, msg, 1024);
+ if (n == EOM) {
+ pn_advance(link);
+ pn_disposition(delivery, ACCEPTED);
+ break;
+ } else {
+ printf("%.*s", (int) n, msg);
+ }
+ }
+ printf("\"\n");
+ } else if (pn_writable(delivery)) {
+ pn_sender_t *sender = (pn_sender_t *) link;
+ sprintf(msg, "message body for %s", tagstr);
+ pn_send(sender, msg, strlen(msg));
+ if (pn_advance(link)) {
+ printf("sent delivery: %s\n", tagstr);
+ char tagbuf[16];
+ sprintf(tagbuf, "%i", ctx->count++);
+ pn_binary_t *tag = pn_binary(tagbuf, strlen(tagbuf));
+ pn_delivery(link, tag);
+ pn_free_binary(tag);
+ }
+ }
+
+ if (pn_dirty(delivery)) {
+ printf("disposition for %s: %u\n", tagstr, pn_remote_disp(delivery));
+ pn_clean(delivery);
+ }
+
+ delivery = pn_work_next(delivery);
+ }
+
+ endpoint = pn_endpoint_head(conn, ACTIVE, CLOSED);
+ while (endpoint)
+ {
+ switch (pn_endpoint_type(endpoint))
+ {
+ case CONNECTION:
+ case SESSION:
+ case SENDER:
+ case RECEIVER:
+ if (pn_remote_state(endpoint) == CLOSED) {
+ pn_close(endpoint);
+ }
+ break;
+ case TRANSPORT:
+ break;
+ }
+
+ endpoint = pn_endpoint_next(endpoint, ACTIVE, CLOSED);
+ }
+}
+
+struct client_context {
+ bool init;
+ int recv_count;
+ int send_count;
+ pn_driver_t *driver;
+};
+
+void client_callback(pn_connection_t *connection, void *context)
+{
+ struct client_context *ctx = context;
+ char tagstr[1024];
+ char msg[1024];
+
+ if (!ctx->init) {
+ ctx->init = true;
+
+ pn_session_t *ssn = pn_session(connection);
+ pn_open((pn_endpoint_t *) connection);
+ pn_open((pn_endpoint_t *) ssn);
+
+ if (ctx->send_count) {
+ pn_sender_t *snd = pn_sender(ssn, L"sender");
+ pn_set_target((pn_link_t *) snd, L"queue");
+ pn_open((pn_endpoint_t *) snd);
+
+ char buf[16];
+ for (int i = 0; i < ctx->send_count; i++) {
+ sprintf(buf, "%c", 'a' + i);
+ pn_binary_t *tag = pn_binary(buf, strlen(buf));
+ pn_delivery((pn_link_t *) snd, tag);
+ pn_free_binary(tag);
+ }
+ }
+
+ if (ctx->recv_count) {
+ pn_receiver_t *rcv = pn_receiver(ssn, L"receiver");
+ pn_set_source((pn_link_t *) rcv, L"queue");
+ pn_open((pn_endpoint_t *) rcv);
+ pn_flow(rcv, ctx->recv_count);
+ }
+ }
+
+ pn_delivery_t *delivery = pn_work_head(connection);
+ while (delivery)
+ {
+ pn_binary_t *tag = pn_delivery_tag(delivery);
+ pn_format(tagstr, 1024, pn_from_binary(tag));
+ pn_link_t *link = pn_link(delivery);
+ if (pn_writable(delivery)) {
+ pn_sender_t *snd = (pn_sender_t *) link;
+ sprintf(msg, "message body for %s", tagstr);
+ pn_send(snd, msg, strlen(msg));
+ if (pn_advance(link)) printf("sent delivery: %s\n", tagstr);
+ } else if (pn_readable(delivery)) {
+ printf("received delivery: %s\n", tagstr);
+ pn_receiver_t *rcv = (pn_receiver_t *) link;
+ printf(" payload = \"");
+ while (true) {
+ size_t n = pn_recv(rcv, msg, 1024);
+ if (n == EOM) {
+ pn_advance(link);
+ pn_disposition(delivery, ACCEPTED);
+ pn_settle(delivery);
+ if (!--ctx->recv_count) {
+ pn_close((pn_endpoint_t *)link);
+ }
+ break;
+ } else {
+ printf("%.*s", (int) n, msg);
+ }
+ }
+ printf("\"\n");
+ }
+
+ if (pn_dirty(delivery)) {
+ printf("disposition for %s: %u\n", tagstr, pn_remote_disp(delivery));
+ pn_clean(delivery);
+ pn_settle(delivery);
+ if (!--ctx->send_count) {
+ pn_close((pn_endpoint_t *)link);
+ }
+ }
+
+ delivery = pn_work_next(delivery);
+ }
+
+ if (!ctx->send_count && !ctx->recv_count) {
+ printf("closing\n");
+ // XXX: how do we close the session?
+ //pn_close((pn_endpoint_t *) ssn);
+ pn_close((pn_endpoint_t *)connection);
+ }
+
+ pn_endpoint_t *endpoint = pn_endpoint_head(connection, CLOSED, CLOSED);
+ while (endpoint)
+ {
+ switch (pn_endpoint_type(endpoint)) {
+ case CONNECTION:
+ pn_driver_stop(ctx->driver);
+ break;
+ case SESSION:
+ case SENDER:
+ case RECEIVER:
+ case TRANSPORT:
+ break;
+ }
+ endpoint = pn_endpoint_next(endpoint, CLOSED, CLOSED);
+ }
+}
+
+int main(int argc, char **argv)
+{
+ if (argc > 1 && !strcmp(argv[1], "value"))
+ {
+ return value(argc, argv);
+ }
+
+ pn_driver_t *drv = pn_driver();
+ if (argc > 1) {
+ struct client_context ctx = {false, 10, 10, drv};
+ if (!pn_connector(drv, "0.0.0.0", "5672", client_callback, &ctx)) perror("proton");
+ } else {
+ struct server_context ctx = {0};
+ if (!pn_acceptor(drv, "0.0.0.0", "5672", server_callback, &ctx)) perror("proton");
+ }
+
+ pn_driver_run(drv);
+ pn_driver_destroy(drv);
+
+ return 0;
+}
Added: qpid/proton/proton-c/src/transport.xml
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/transport.xml?rev=1298498&view=auto
==============================================================================
--- qpid/proton/proton-c/src/transport.xml (added)
+++ qpid/proton/proton-c/src/transport.xml Thu Mar 8 18:33:46 2012
@@ -0,0 +1,200 @@
+<?xml version="1.0"?>
+
+<!--
+
+Copyright Bank of America, N.A., Barclays Bank PLC, Cisco Systems, Credit
+Suisse, Deutsche Boerse Systems, Goldman Sachs, HCL Technologies Ltd, INETCO
+Systems Limited, Informatica Corporation, JPMorgan Chase Bank Inc. N.A,
+Microsoft Corporation, my-Channels, Novell, Progress Software, Red Hat Inc.,
+Software AG, Solace Systems Inc., StormMQ Ltd., Tervela Inc., TWIST Process
+Innovations Ltd, VMware Inc. and WS02 Inc. 2006-2011. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+1. Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+3. The name of the author may not be used to endorse or promote products
+derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+-->
+
+<amqp name="transport" xmlns="http://www.amqp.org/schema/amqp.xsd">
+ <section name="frame-bodies">
+ <type name="open" class="composite" source="list" provides="frame">
+ <descriptor name="amqp:open:list" code="0x00000000:0x00000010"/>
+ <field name="container-id" type="string" mandatory="true"/>
+ <field name="hostname" type="string"/>
+ <field name="max-frame-size" type="uint" default="4294967295"/>
+ <field name="channel-max" type="ushort" default="65535"/>
+ <field name="idle-time-out" type="milliseconds"/>
+ <field name="outgoing-locales" type="ietf-language-tag" multiple="true"/>
+ <field name="incoming-locales" type="ietf-language-tag" multiple="true"/>
+ <field name="offered-capabilities" type="symbol" multiple="true"/>
+ <field name="desired-capabilities" type="symbol" multiple="true"/>
+ <field name="properties" type="fields"/>
+ </type>
+ <type name="begin" class="composite" source="list" provides="frame">
+ <descriptor name="amqp:begin:list" code="0x00000000:0x00000011"/>
+ <field name="remote-channel" type="ushort"/>
+ <field name="next-outgoing-id" type="transfer-number" mandatory="true"/>
+ <field name="incoming-window" type="uint" mandatory="true"/>
+ <field name="outgoing-window" type="uint" mandatory="true"/>
+ <field name="handle-max" type="handle" default="4294967295"/>
+ <field name="offered-capabilities" type="symbol" multiple="true"/>
+ <field name="desired-capabilities" type="symbol" multiple="true"/>
+ <field name="properties" type="fields"/>
+ </type>
+ <type name="attach" class="composite" source="list" provides="frame">
+ <descriptor name="amqp:attach:list" code="0x00000000:0x00000012"/>
+ <field name="name" type="string" mandatory="true"/>
+ <field name="handle" type="handle" mandatory="true"/>
+ <field name="role" type="role" mandatory="true"/>
+ <field name="snd-settle-mode" type="sender-settle-mode" default="mixed"/>
+ <field name="rcv-settle-mode" type="receiver-settle-mode" default="first"/>
+ <field name="source" type="*" requires="source"/>
+ <field name="target" type="*" requires="target"/>
+ <field name="unsettled" type="map"/>
+ <field name="incomplete-unsettled" type="boolean" default="false"/>
+ <field name="initial-delivery-count" type="sequence-no"/>
+ <field name="max-message-size" type="ulong"/>
+ <field name="offered-capabilities" type="symbol" multiple="true"/>
+ <field name="desired-capabilities" type="symbol" multiple="true"/>
+ <field name="properties" type="fields"/>
+ </type>
+ <type name="flow" class="composite" source="list" provides="frame">
+ <descriptor name="amqp:flow:list" code="0x00000000:0x00000013"/>
+ <field name="next-incoming-id" type="transfer-number"/>
+ <field name="incoming-window" type="uint" mandatory="true"/>
+ <field name="next-outgoing-id" type="transfer-number" mandatory="true"/>
+ <field name="outgoing-window" type="uint" mandatory="true"/>
+ <field name="handle" type="handle"/>
+ <field name="delivery-count" type="sequence-no"/>
+ <field name="link-credit" type="uint"/>
+ <field name="available" type="uint"/>
+ <field name="drain" type="boolean" default="false"/>
+ <field name="echo" type="boolean" default="false"/>
+ <field name="properties" type="fields"/>
+ </type>
+ <type name="transfer" class="composite" source="list" provides="frame">
+ <descriptor name="amqp:transfer:list" code="0x00000000:0x00000014"/>
+ <field name="handle" type="handle" mandatory="true"/>
+ <field name="delivery-id" type="delivery-number"/>
+ <field name="delivery-tag" type="delivery-tag"/>
+ <field name="message-format" type="message-format"/>
+ <field name="settled" type="boolean"/>
+ <field name="more" type="boolean" default="false"/>
+ <field name="rcv-settle-mode" type="receiver-settle-mode"/>
+ <field name="state" type="*" requires="delivery-state"/>
+ <field name="resume" type="boolean" default="false"/>
+ <field name="aborted" type="boolean" default="false"/>
+ <field name="batchable" type="boolean" default="false"/>
+ </type>
+ <type name="disposition" class="composite" source="list" provides="frame">
+ <descriptor name="amqp:disposition:list" code="0x00000000:0x00000015"/>
+ <field name="role" type="role" mandatory="true"/>
+ <field name="first" type="delivery-number" mandatory="true"/>
+ <field name="last" type="delivery-number"/>
+ <field name="settled" type="boolean" default="false"/>
+ <field name="state" type="*" requires="delivery-state"/>
+ <field name="batchable" type="boolean" default="false"/>
+ </type>
+ <type name="detach" class="composite" source="list" provides="frame">
+ <descriptor name="amqp:detach:list" code="0x00000000:0x00000016"/>
+ <field name="handle" type="handle" mandatory="true"/>
+ <field name="closed" type="boolean" default="false"/>
+ <field name="error" type="error"/>
+ </type>
+ <type name="end" class="composite" source="list" provides="frame">
+ <descriptor name="amqp:end:list" code="0x00000000:0x00000017"/>
+ <field name="error" type="error"/>
+ </type>
+ <type name="close" class="composite" source="list" provides="frame">
+ <descriptor name="amqp:close:list" code="0x00000000:0x00000018"/>
+ <field name="error" type="error"/>
+ </type>
+ </section>
+ <section name="definitions">
+ <type name="role" class="restricted" source="boolean">
+ <choice name="sender" value="false"/>
+ <choice name="receiver" value="true"/>
+ </type>
+ <type name="sender-settle-mode" class="restricted" source="ubyte">
+ <choice name="unsettled" value="0"/>
+ <choice name="settled" value="1"/>
+ <choice name="mixed" value="2"/>
+ </type>
+ <type name="receiver-settle-mode" class="restricted" source="ubyte">
+ <choice name="first" value="0"/>
+ <choice name="second" value="1"/>
+ </type>
+ <type name="handle" class="restricted" source="uint"/>
+ <type name="seconds" class="restricted" source="uint"/>
+ <type name="milliseconds" class="restricted" source="uint"/>
+ <type name="delivery-tag" class="restricted" source="binary"/>
+ <type name="delivery-number" class="restricted" source="sequence-no"/>
+ <type name="transfer-number" class="restricted" source="sequence-no"/>
+ <type name="sequence-no" class="restricted" source="uint"/>
+ <type name="message-format" class="restricted" source="uint"/>
+ <type name="ietf-language-tag" class="restricted" source="symbol"/>
+ <type name="fields" class="restricted" source="map"/>
+ <type name="error" class="composite" source="list">
+ <descriptor name="amqp:error:list" code="0x00000000:0x0000001d"/>
+ <field name="condition" type="symbol" mandatory="true" requires="error-condition"/>
+ <field name="description" type="string"/>
+ <field name="info" type="fields"/>
+ </type>
+ <type name="amqp-error" class="restricted" source="symbol" provides="error-condition">
+ <choice name="internal-error" value="amqp:internal-error"/>
+ <choice name="not-found" value="amqp:not-found"/>
+ <choice name="unauthorized-access" value="amqp:unauthorized-access"/>
+ <choice name="decode-error" value="amqp:decode-error"/>
+ <choice name="resource-limit-exceeded" value="amqp:resource-limit-exceeded"/>
+ <choice name="not-allowed" value="amqp:not-allowed"/>
+ <choice name="invalid-field" value="amqp:invalid-field"/>
+ <choice name="not-implemented" value="amqp:not-implemented"/>
+ <choice name="resource-locked" value="amqp:resource-locked"/>
+ <choice name="precondition-failed" value="amqp:precondition-failed"/>
+ <choice name="resource-deleted" value="amqp:resource-deleted"/>
+ <choice name="illegal-state" value="amqp:illegal-state"/>
+ <choice name="frame-size-too-small" value="amqp:frame-size-too-small"/>
+ </type>
+ <type name="connection-error" class="restricted" source="symbol" provides="error-condition">
+ <choice name="connection-forced" value="amqp:connection:forced"/>
+ <choice name="framing-error" value="amqp:connection:framing-error"/>
+ <choice name="redirect" value="amqp:connection:redirect"/>
+ </type>
+ <type name="session-error" class="restricted" source="symbol" provides="error-condition">
+ <choice name="window-violation" value="amqp:session:window-violation"/>
+ <choice name="errant-link" value="amqp:session:errant-link"/>
+ <choice name="handle-in-use" value="amqp:session:handle-in-use"/>
+ <choice name="unattached-handle" value="amqp:session:unattached-handle"/>
+ </type>
+ <type name="link-error" class="restricted" source="symbol" provides="error-condition">
+ <choice name="detach-forced" value="amqp:link:detach-forced"/>
+ <choice name="transfer-limit-exceeded" value="amqp:link:transfer-limit-exceeded"/>
+ <choice name="message-size-exceeded" value="amqp:link:message-size-exceeded"/>
+ <choice name="redirect" value="amqp:link:redirect"/>
+ <choice name="stolen" value="amqp:link:stolen"/>
+ </type>
+ <definition name="PORT" value="5672"/>
+ <definition name="SECURE-PORT" value="5671"/>
+ <definition name="MAJOR" value="1"/>
+ <definition name="MINOR" value="0"/>
+ <definition name="REVISION" value="0"/>
+ <definition name="MIN-MAX-FRAME-SIZE" value="512"/>
+ </section>
+</amqp>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org