You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jr...@apache.org on 2018/04/05 19:34:15 UTC
[40/51] [partial] qpid-proton git commit: PROTON-1728: Reorganize the
source tree
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/engine.c
----------------------------------------------------------------------
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
new file mode 100644
index 0000000..f49886d
--- /dev/null
+++ b/c/src/core/engine.c
@@ -0,0 +1,2343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "engine-internal.h"
+#include <stdlib.h>
+#include <string.h>
+#include "protocol.h"
+
+#include <assert.h>
+#include <stdarg.h>
+#include <stdio.h>
+
+#include "platform/platform.h"
+#include "platform/platform_fmt.h"
+#include "transport.h"
+
+static void pni_session_bound(pn_session_t *ssn);
+static void pni_link_bound(pn_link_t *link);
+
+
+// endpoints
+
+static pn_connection_t *pni_ep_get_connection(pn_endpoint_t *endpoint)
+{
+ switch (endpoint->type) {
+ case CONNECTION:
+ return (pn_connection_t *) endpoint;
+ case SESSION:
+ return ((pn_session_t *) endpoint)->connection;
+ case SENDER:
+ case RECEIVER:
+ return ((pn_link_t *) endpoint)->session->connection;
+ }
+
+ return NULL;
+}
+
+static pn_event_type_t endpoint_event(pn_endpoint_type_t type, bool open) {
+ switch (type) {
+ case CONNECTION:
+ return open ? PN_CONNECTION_LOCAL_OPEN : PN_CONNECTION_LOCAL_CLOSE;
+ case SESSION:
+ return open ? PN_SESSION_LOCAL_OPEN : PN_SESSION_LOCAL_CLOSE;
+ case SENDER:
+ case RECEIVER:
+ return open ? PN_LINK_LOCAL_OPEN : PN_LINK_LOCAL_CLOSE;
+ default:
+ assert(false);
+ return PN_EVENT_NONE;
+ }
+}
+
+static void pn_endpoint_open(pn_endpoint_t *endpoint)
+{
+ if (!(endpoint->state & PN_LOCAL_ACTIVE)) {
+ PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE);
+ pn_connection_t *conn = pni_ep_get_connection(endpoint);
+ pn_collector_put(conn->collector, PN_OBJECT, endpoint,
+ endpoint_event(endpoint->type, true));
+ pn_modified(conn, endpoint, true);
+ }
+}
+
+static void pn_endpoint_close(pn_endpoint_t *endpoint)
+{
+ if (!(endpoint->state & PN_LOCAL_CLOSED)) {
+ PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED);
+ pn_connection_t *conn = pni_ep_get_connection(endpoint);
+ pn_collector_put(conn->collector, PN_OBJECT, endpoint,
+ endpoint_event(endpoint->type, false));
+ pn_modified(conn, endpoint, true);
+ }
+}
+
+void pn_connection_reset(pn_connection_t *connection)
+{
+ assert(connection);
+ pn_endpoint_t *endpoint = &connection->endpoint;
+ endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
+}
+
+void pn_connection_open(pn_connection_t *connection)
+{
+ assert(connection);
+ pn_endpoint_open(&connection->endpoint);
+}
+
+void pn_connection_close(pn_connection_t *connection)
+{
+ assert(connection);
+ pn_endpoint_close(&connection->endpoint);
+}
+
+static void pni_endpoint_tini(pn_endpoint_t *endpoint);
+
+void pn_connection_release(pn_connection_t *connection)
+{
+ assert(!connection->endpoint.freed);
+ // free those endpoints that haven't been freed by the application
+ LL_REMOVE(connection, endpoint, &connection->endpoint);
+ while (connection->endpoint_head) {
+ pn_endpoint_t *ep = connection->endpoint_head;
+ switch (ep->type) {
+ case SESSION:
+ // note: this will free all child links:
+ pn_session_free((pn_session_t *)ep);
+ break;
+ case SENDER:
+ case RECEIVER:
+ pn_link_free((pn_link_t *)ep);
+ break;
+ default:
+ assert(false);
+ }
+ }
+ connection->endpoint.freed = true;
+ if (!connection->transport) {
+ // no transport available to consume transport work items,
+ // so manually clear them:
+ pn_ep_incref(&connection->endpoint);
+ pn_connection_unbound(connection);
+ }
+ pn_ep_decref(&connection->endpoint);
+}
+
+void pn_connection_free(pn_connection_t *connection) {
+ pn_connection_release(connection);
+ pn_decref(connection);
+}
+
+void pn_connection_bound(pn_connection_t *connection)
+{
+ pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND);
+ pn_ep_incref(&connection->endpoint);
+
+ size_t nsessions = pn_list_size(connection->sessions);
+ for (size_t i = 0; i < nsessions; i++) {
+ pni_session_bound((pn_session_t *) pn_list_get(connection->sessions, i));
+ }
+}
+
+// invoked when transport has been removed:
+void pn_connection_unbound(pn_connection_t *connection)
+{
+ connection->transport = NULL;
+ if (connection->endpoint.freed) {
+ // connection has been freed prior to unbinding, thus it
+ // cannot be re-assigned to a new transport. Clear the
+ // transport work lists to allow the connection to be freed.
+ while (connection->transport_head) {
+ pn_clear_modified(connection, connection->transport_head);
+ }
+ while (connection->tpwork_head) {
+ pn_clear_tpwork(connection->tpwork_head);
+ }
+ }
+ pn_ep_decref(&connection->endpoint);
+}
+
+pn_record_t *pn_connection_attachments(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->context;
+}
+
+void *pn_connection_get_context(pn_connection_t *conn)
+{
+ // XXX: we should really assert on conn here, but this causes
+ // messenger tests to fail
+ return conn ? pn_record_get(conn->context, PN_LEGCTX) : NULL;
+}
+
+void pn_connection_set_context(pn_connection_t *conn, void *context)
+{
+ assert(conn);
+ pn_record_set(conn->context, PN_LEGCTX, context);
+}
+
+pn_transport_t *pn_connection_transport(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->transport;
+}
+
+void pn_condition_init(pn_condition_t *condition)
+{
+ condition->name = pn_string(NULL);
+ condition->description = pn_string(NULL);
+ condition->info = pn_data(0);
+}
+
+pn_condition_t *pn_condition() {
+ pn_condition_t *c = (pn_condition_t*)malloc(sizeof(pn_condition_t));
+ pn_condition_init(c);
+ return c;
+}
+
+void pn_condition_tini(pn_condition_t *condition)
+{
+ pn_data_free(condition->info);
+ pn_free(condition->description);
+ pn_free(condition->name);
+}
+
+void pn_condition_free(pn_condition_t *c) {
+ if (c) {
+ pn_condition_clear(c);
+ pn_condition_tini(c);
+ free(c);
+ }
+}
+
+static void pni_add_session(pn_connection_t *conn, pn_session_t *ssn)
+{
+ pn_list_add(conn->sessions, ssn);
+ ssn->connection = conn;
+ pn_incref(conn); // keep around until finalized
+ pn_ep_incref(&conn->endpoint);
+}
+
+static void pni_remove_session(pn_connection_t *conn, pn_session_t *ssn)
+{
+ if (pn_list_remove(conn->sessions, ssn)) {
+ pn_ep_decref(&conn->endpoint);
+ LL_REMOVE(conn, endpoint, &ssn->endpoint);
+ }
+}
+
+pn_connection_t *pn_session_connection(pn_session_t *session)
+{
+ if (!session) return NULL;
+ return session->connection;
+}
+
+void pn_session_open(pn_session_t *session)
+{
+ assert(session);
+ pn_endpoint_open(&session->endpoint);
+}
+
+void pn_session_close(pn_session_t *session)
+{
+ assert(session);
+ pn_endpoint_close(&session->endpoint);
+}
+
+void pn_session_free(pn_session_t *session)
+{
+ assert(!session->endpoint.freed);
+ while(pn_list_size(session->links)) {
+ pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0);
+ pn_link_free(link);
+ }
+ pni_remove_session(session->connection, session);
+ pn_list_add(session->connection->freed, session);
+ session->endpoint.freed = true;
+ pn_ep_decref(&session->endpoint);
+
+ // the finalize logic depends on endpoint.freed, so we incref/decref
+ // to give it a chance to rerun
+ pn_incref(session);
+ pn_decref(session);
+}
+
+pn_record_t *pn_session_attachments(pn_session_t *session)
+{
+ assert(session);
+ return session->context;
+}
+
+void *pn_session_get_context(pn_session_t *session)
+{
+ return session ? pn_record_get(session->context, PN_LEGCTX) : 0;
+}
+
+void pn_session_set_context(pn_session_t *session, void *context)
+{
+ assert(context);
+ pn_record_set(session->context, PN_LEGCTX, context);
+}
+
+
+static void pni_add_link(pn_session_t *ssn, pn_link_t *link)
+{
+ pn_list_add(ssn->links, link);
+ link->session = ssn;
+ pn_ep_incref(&ssn->endpoint);
+}
+
+static void pni_remove_link(pn_session_t *ssn, pn_link_t *link)
+{
+ if (pn_list_remove(ssn->links, link)) {
+ pn_ep_decref(&ssn->endpoint);
+ LL_REMOVE(ssn->connection, endpoint, &link->endpoint);
+ }
+}
+
+void pn_link_open(pn_link_t *link)
+{
+ assert(link);
+ pn_endpoint_open(&link->endpoint);
+}
+
+void pn_link_close(pn_link_t *link)
+{
+ assert(link);
+ pn_endpoint_close(&link->endpoint);
+}
+
+void pn_link_detach(pn_link_t *link)
+{
+ assert(link);
+ if (link->detached) return;
+
+ link->detached = true;
+ pn_collector_put(link->session->connection->collector, PN_OBJECT, link, PN_LINK_LOCAL_DETACH);
+ pn_modified(link->session->connection, &link->endpoint, true);
+
+}
+
+static void pni_terminus_free(pn_terminus_t *terminus)
+{
+ pn_free(terminus->address);
+ pn_free(terminus->properties);
+ pn_free(terminus->capabilities);
+ pn_free(terminus->outcomes);
+ pn_free(terminus->filter);
+}
+
+void pn_link_free(pn_link_t *link)
+{
+ assert(!link->endpoint.freed);
+ pni_remove_link(link->session, link);
+ pn_list_add(link->session->freed, link);
+ pn_delivery_t *delivery = link->unsettled_head;
+ while (delivery) {
+ pn_delivery_t *next = delivery->unsettled_next;
+ pn_delivery_settle(delivery);
+ delivery = next;
+ }
+ link->endpoint.freed = true;
+ pn_ep_decref(&link->endpoint);
+
+ // the finalize logic depends on endpoint.freed (modified above), so
+ // we incref/decref to give it a chance to rerun
+ pn_incref(link);
+ pn_decref(link);
+}
+
+void *pn_link_get_context(pn_link_t *link)
+{
+ assert(link);
+ return pn_record_get(link->context, PN_LEGCTX);
+}
+
+void pn_link_set_context(pn_link_t *link, void *context)
+{
+ assert(link);
+ pn_record_set(link->context, PN_LEGCTX, context);
+}
+
+pn_record_t *pn_link_attachments(pn_link_t *link)
+{
+ assert(link);
+ return link->context;
+}
+
+void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
+{
+ endpoint->type = (pn_endpoint_type_t) type;
+ endpoint->referenced = true;
+ endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
+ endpoint->error = pn_error();
+ pn_condition_init(&endpoint->condition);
+ pn_condition_init(&endpoint->remote_condition);
+ endpoint->endpoint_next = NULL;
+ endpoint->endpoint_prev = NULL;
+ endpoint->transport_next = NULL;
+ endpoint->transport_prev = NULL;
+ endpoint->modified = false;
+ endpoint->freed = false;
+ endpoint->refcount = 1;
+ //fprintf(stderr, "initting 0x%lx\n", (uintptr_t) endpoint);
+
+ LL_ADD(conn, endpoint, endpoint);
+}
+
+void pn_ep_incref(pn_endpoint_t *endpoint)
+{
+ endpoint->refcount++;
+}
+
+static pn_event_type_t pn_final_type(pn_endpoint_type_t type) {
+ switch (type) {
+ case CONNECTION:
+ return PN_CONNECTION_FINAL;
+ case SESSION:
+ return PN_SESSION_FINAL;
+ case SENDER:
+ case RECEIVER:
+ return PN_LINK_FINAL;
+ default:
+ assert(false);
+ return PN_EVENT_NONE;
+ }
+}
+
+static pn_endpoint_t *pn_ep_parent(pn_endpoint_t *endpoint) {
+ switch (endpoint->type) {
+ case CONNECTION:
+ return NULL;
+ case SESSION:
+ return &((pn_session_t *) endpoint)->connection->endpoint;
+ case SENDER:
+ case RECEIVER:
+ return &((pn_link_t *) endpoint)->session->endpoint;
+ default:
+ assert(false);
+ return NULL;
+ }
+}
+
+void pn_ep_decref(pn_endpoint_t *endpoint)
+{
+ assert(endpoint->refcount > 0);
+ endpoint->refcount--;
+ if (endpoint->refcount == 0) {
+ pn_connection_t *conn = pni_ep_get_connection(endpoint);
+ pn_collector_put(conn->collector, PN_OBJECT, endpoint, pn_final_type(endpoint->type));
+ }
+}
+
+static void pni_endpoint_tini(pn_endpoint_t *endpoint)
+{
+ pn_error_free(endpoint->error);
+ pn_condition_tini(&endpoint->remote_condition);
+ pn_condition_tini(&endpoint->condition);
+}
+
+static void pni_free_children(pn_list_t *children, pn_list_t *freed)
+{
+ while (pn_list_size(children) > 0) {
+ pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(children, 0);
+ assert(!endpoint->referenced);
+ pn_free(endpoint);
+ }
+
+ while (pn_list_size(freed) > 0) {
+ pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(freed, 0);
+ assert(!endpoint->referenced);
+ pn_free(endpoint);
+ }
+
+ pn_free(children);
+ pn_free(freed);
+}
+
+static void pn_connection_finalize(void *object)
+{
+ pn_connection_t *conn = (pn_connection_t *) object;
+ pn_endpoint_t *endpoint = &conn->endpoint;
+
+ if (conn->transport) {
+ assert(!conn->transport->referenced);
+ pn_free(conn->transport);
+ }
+
+ // freeing the transport could post events
+ if (pn_refcount(conn) > 0) {
+ return;
+ }
+
+ pni_free_children(conn->sessions, conn->freed);
+ pn_free(conn->context);
+ pn_decref(conn->collector);
+
+ pn_free(conn->container);
+ pn_free(conn->hostname);
+ pn_free(conn->auth_user);
+ pn_free(conn->auth_password);
+ pn_free(conn->offered_capabilities);
+ pn_free(conn->desired_capabilities);
+ pn_free(conn->properties);
+ pni_endpoint_tini(endpoint);
+ pn_free(conn->delivery_pool);
+}
+
+#define pn_connection_initialize NULL
+#define pn_connection_hashcode NULL
+#define pn_connection_compare NULL
+#define pn_connection_inspect NULL
+
+pn_connection_t *pn_connection()
+{
+ static const pn_class_t clazz = PN_CLASS(pn_connection);
+ pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t));
+ if (!conn) return NULL;
+
+ conn->endpoint_head = NULL;
+ conn->endpoint_tail = NULL;
+ pn_endpoint_init(&conn->endpoint, CONNECTION, conn);
+ conn->transport_head = NULL;
+ conn->transport_tail = NULL;
+ conn->sessions = pn_list(PN_WEAKREF, 0);
+ conn->freed = pn_list(PN_WEAKREF, 0);
+ conn->transport = NULL;
+ conn->work_head = NULL;
+ conn->work_tail = NULL;
+ conn->tpwork_head = NULL;
+ conn->tpwork_tail = NULL;
+ conn->container = pn_string(NULL);
+ conn->hostname = pn_string(NULL);
+ conn->auth_user = pn_string(NULL);
+ conn->auth_password = pn_string(NULL);
+ conn->offered_capabilities = pn_data(0);
+ conn->desired_capabilities = pn_data(0);
+ conn->properties = pn_data(0);
+ conn->collector = NULL;
+ conn->context = pn_record();
+ conn->delivery_pool = pn_list(PN_OBJECT, 0);
+ conn->driver = NULL;
+
+ return conn;
+}
+
+static const pn_event_type_t endpoint_init_event_map[] = {
+ PN_CONNECTION_INIT, /* CONNECTION */
+ PN_SESSION_INIT, /* SESSION */
+ PN_LINK_INIT, /* SENDER */
+ PN_LINK_INIT}; /* RECEIVER */
+
+void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector)
+{
+ pn_decref(connection->collector);
+ connection->collector = collector;
+ pn_incref(connection->collector);
+ pn_endpoint_t *endpoint = connection->endpoint_head;
+ while (endpoint) {
+ pn_collector_put(connection->collector, PN_OBJECT, endpoint, endpoint_init_event_map[endpoint->type]);
+ endpoint = endpoint->endpoint_next;
+ }
+}
+
+pn_collector_t* pn_connection_collector(pn_connection_t *connection) {
+ return connection->collector;
+}
+
+pn_state_t pn_connection_state(pn_connection_t *connection)
+{
+ return connection ? connection->endpoint.state : 0;
+}
+
+pn_error_t *pn_connection_error(pn_connection_t *connection)
+{
+ return connection ? connection->endpoint.error : NULL;
+}
+
+const char *pn_connection_get_container(pn_connection_t *connection)
+{
+ assert(connection);
+ return pn_string_get(connection->container);
+}
+
+void pn_connection_set_container(pn_connection_t *connection, const char *container)
+{
+ assert(connection);
+ pn_string_set(connection->container, container);
+}
+
+const char *pn_connection_get_hostname(pn_connection_t *connection)
+{
+ assert(connection);
+ return pn_string_get(connection->hostname);
+}
+
+void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname)
+{
+ assert(connection);
+ pn_string_set(connection->hostname, hostname);
+}
+
+const char *pn_connection_get_user(pn_connection_t *connection)
+{
+ assert(connection);
+ return pn_string_get(connection->auth_user);
+}
+
+void pn_connection_set_user(pn_connection_t *connection, const char *user)
+{
+ assert(connection);
+ pn_string_set(connection->auth_user, user);
+}
+
+void pn_connection_set_password(pn_connection_t *connection, const char *password)
+{
+ assert(connection);
+ // Make sure the previous password is erased, if there was one.
+ size_t n = pn_string_size(connection->auth_password);
+ const char* s = pn_string_get(connection->auth_password);
+ if (n > 0 && s) memset((void*)s, 0, n);
+ pn_string_set(connection->auth_password, password);
+}
+
+pn_data_t *pn_connection_offered_capabilities(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->offered_capabilities;
+}
+
+pn_data_t *pn_connection_desired_capabilities(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->desired_capabilities;
+}
+
+pn_data_t *pn_connection_properties(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->properties;
+}
+
+pn_data_t *pn_connection_remote_offered_capabilities(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->transport ? connection->transport->remote_offered_capabilities : NULL;
+}
+
+pn_data_t *pn_connection_remote_desired_capabilities(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->transport ? connection->transport->remote_desired_capabilities : NULL;
+}
+
+pn_data_t *pn_connection_remote_properties(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->transport ? connection->transport->remote_properties : NULL;
+}
+
+const char *pn_connection_remote_container(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->transport ? connection->transport->remote_container : NULL;
+}
+
+const char *pn_connection_remote_hostname(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->transport ? connection->transport->remote_hostname : NULL;
+}
+
+pn_delivery_t *pn_work_head(pn_connection_t *connection)
+{
+ assert(connection);
+ return connection->work_head;
+}
+
+pn_delivery_t *pn_work_next(pn_delivery_t *delivery)
+{
+ assert(delivery);
+
+ if (delivery->work)
+ return delivery->work_next;
+ else
+ return pn_work_head(delivery->link->session->connection);
+}
+
+static void pni_add_work(pn_connection_t *connection, pn_delivery_t *delivery)
+{
+ if (!delivery->work)
+ {
+ assert(!delivery->local.settled); // never allow settled deliveries
+ LL_ADD(connection, work, delivery);
+ delivery->work = true;
+ }
+}
+
+static void pni_clear_work(pn_connection_t *connection, pn_delivery_t *delivery)
+{
+ if (delivery->work)
+ {
+ LL_REMOVE(connection, work, delivery);
+ delivery->work = false;
+ }
+}
+
+void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery)
+{
+ pn_link_t *link = pn_delivery_link(delivery);
+ pn_delivery_t *current = pn_link_current(link);
+ if (delivery->updated && !delivery->local.settled) {
+ pni_add_work(connection, delivery);
+ } else if (delivery == current) {
+ if (link->endpoint.type == SENDER) {
+ if (pn_link_credit(link) > 0) {
+ pni_add_work(connection, delivery);
+ } else {
+ pni_clear_work(connection, delivery);
+ }
+ } else {
+ pni_add_work(connection, delivery);
+ }
+ } else {
+ pni_clear_work(connection, delivery);
+ }
+}
+
+static void pni_add_tpwork(pn_delivery_t *delivery)
+{
+ pn_connection_t *connection = delivery->link->session->connection;
+ if (!delivery->tpwork)
+ {
+ LL_ADD(connection, tpwork, delivery);
+ delivery->tpwork = true;
+ }
+ pn_modified(connection, &connection->endpoint, true);
+}
+
+void pn_clear_tpwork(pn_delivery_t *delivery)
+{
+ pn_connection_t *connection = delivery->link->session->connection;
+ if (delivery->tpwork)
+ {
+ LL_REMOVE(connection, tpwork, delivery);
+ delivery->tpwork = false;
+ if (pn_refcount(delivery) > 0) {
+ pn_incref(delivery);
+ pn_decref(delivery);
+ }
+ }
+}
+
+void pn_dump(pn_connection_t *conn)
+{
+ pn_endpoint_t *endpoint = conn->transport_head;
+ while (endpoint)
+ {
+ printf("%p", (void *) endpoint);
+ endpoint = endpoint->transport_next;
+ if (endpoint)
+ printf(" -> ");
+ }
+ printf("\n");
+}
+
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit)
+{
+ if (!endpoint->modified) {
+ LL_ADD(connection, transport, endpoint);
+ endpoint->modified = true;
+ }
+
+ if (emit && connection->transport) {
+ pn_collector_put(connection->collector, PN_OBJECT, connection->transport,
+ PN_TRANSPORT);
+ }
+}
+
+void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
+{
+ if (endpoint->modified) {
+ LL_REMOVE(connection, transport, endpoint);
+ endpoint->transport_next = NULL;
+ endpoint->transport_prev = NULL;
+ endpoint->modified = false;
+ }
+}
+
+static bool pni_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
+{
+ if (endpoint->type != type) return false;
+
+ if (!state) return true;
+
+ int st = endpoint->state;
+ if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0)
+ return st & state;
+ else
+ return st == state;
+}
+
+pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
+{
+ while (endpoint)
+ {
+ if (pni_matches(endpoint, type, state))
+ return endpoint;
+ endpoint = endpoint->endpoint_next;
+ }
+ return NULL;
+}
+
+pn_session_t *pn_session_head(pn_connection_t *conn, pn_state_t state)
+{
+ if (conn)
+ return (pn_session_t *) pn_find(conn->endpoint_head, SESSION, state);
+ else
+ return NULL;
+}
+
+pn_session_t *pn_session_next(pn_session_t *ssn, pn_state_t state)
+{
+ if (ssn)
+ return (pn_session_t *) pn_find(ssn->endpoint.endpoint_next, SESSION, state);
+ else
+ return NULL;
+}
+
+pn_link_t *pn_link_head(pn_connection_t *conn, pn_state_t state)
+{
+ if (!conn) return NULL;
+
+ pn_endpoint_t *endpoint = conn->endpoint_head;
+
+ while (endpoint)
+ {
+ if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state))
+ return (pn_link_t *) endpoint;
+ endpoint = endpoint->endpoint_next;
+ }
+
+ return NULL;
+}
+
+pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state)
+{
+ if (!link) return NULL;
+
+ pn_endpoint_t *endpoint = link->endpoint.endpoint_next;
+
+ while (endpoint)
+ {
+ if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state))
+ return (pn_link_t *) endpoint;
+ endpoint = endpoint->endpoint_next;
+ }
+
+ return NULL;
+}
+
+static void pn_session_incref(void *object)
+{
+ pn_session_t *session = (pn_session_t *) object;
+ if (!session->endpoint.referenced) {
+ session->endpoint.referenced = true;
+ pn_incref(session->connection);
+ } else {
+ pn_object_incref(object);
+ }
+}
+
+static bool pn_ep_bound(pn_endpoint_t *endpoint)
+{
+ pn_connection_t *conn = pni_ep_get_connection(endpoint);
+ pn_session_t *ssn;
+ pn_link_t *lnk;
+
+ if (!conn->transport) return false;
+ if (endpoint->modified) return true;
+
+ switch (endpoint->type) {
+ case CONNECTION:
+ return ((pn_connection_t *)endpoint)->transport;
+ case SESSION:
+ ssn = (pn_session_t *) endpoint;
+ return (((int16_t) ssn->state.local_channel) >= 0 || ((int16_t) ssn->state.remote_channel) >= 0);
+ case SENDER:
+ case RECEIVER:
+ lnk = (pn_link_t *) endpoint;
+ return ((int32_t) lnk->state.local_handle) >= 0 || ((int32_t) lnk->state.remote_handle) >= 0;
+ default:
+ assert(false);
+ return false;
+ }
+}
+
+static bool pni_connection_live(pn_connection_t *conn) {
+ return pn_refcount(conn) > 1;
+}
+
+static bool pni_session_live(pn_session_t *ssn) {
+ return pni_connection_live(ssn->connection) || pn_refcount(ssn) > 1;
+}
+
+static bool pni_link_live(pn_link_t *link) {
+ return pni_session_live(link->session) || pn_refcount(link) > 1;
+}
+
+static bool pni_endpoint_live(pn_endpoint_t *endpoint) {
+ switch (endpoint->type) {
+ case CONNECTION:
+ return pni_connection_live((pn_connection_t *)endpoint);
+ case SESSION:
+ return pni_session_live((pn_session_t *) endpoint);
+ case SENDER:
+ case RECEIVER:
+ return pni_link_live((pn_link_t *) endpoint);
+ default:
+ assert(false);
+ return false;
+ }
+}
+
+static bool pni_preserve_child(pn_endpoint_t *endpoint)
+{
+ pn_connection_t *conn = pni_ep_get_connection(endpoint);
+ pn_endpoint_t *parent = pn_ep_parent(endpoint);
+ if (pni_endpoint_live(parent) && (!endpoint->freed || (pn_ep_bound(endpoint)))
+ && endpoint->referenced) {
+ pn_object_incref(endpoint);
+ endpoint->referenced = false;
+ pn_decref(parent);
+ return true;
+ } else {
+ LL_REMOVE(conn, transport, endpoint);
+ return false;
+ }
+}
+
+static void pn_session_finalize(void *object)
+{
+ pn_session_t *session = (pn_session_t *) object;
+ pn_endpoint_t *endpoint = &session->endpoint;
+
+ if (pni_preserve_child(endpoint)) {
+ return;
+ }
+
+ pn_free(session->context);
+ pni_free_children(session->links, session->freed);
+ pni_endpoint_tini(endpoint);
+ pn_delivery_map_free(&session->state.incoming);
+ pn_delivery_map_free(&session->state.outgoing);
+ pn_free(session->state.local_handles);
+ pn_free(session->state.remote_handles);
+ pni_remove_session(session->connection, session);
+ pn_list_remove(session->connection->freed, session);
+
+ if (session->connection->transport) {
+ pn_transport_t *transport = session->connection->transport;
+ pn_hash_del(transport->local_channels, session->state.local_channel);
+ pn_hash_del(transport->remote_channels, session->state.remote_channel);
+ }
+
+ if (endpoint->referenced) {
+ pn_decref(session->connection);
+ }
+}
+
+#define pn_session_new pn_object_new
+#define pn_session_refcount pn_object_refcount
+#define pn_session_decref pn_object_decref
+#define pn_session_reify pn_object_reify
+#define pn_session_initialize NULL
+#define pn_session_hashcode NULL
+#define pn_session_compare NULL
+#define pn_session_inspect NULL
+
+pn_session_t *pn_session(pn_connection_t *conn)
+{
+ assert(conn);
+#define pn_session_free pn_object_free
+ static const pn_class_t clazz = PN_METACLASS(pn_session);
+#undef pn_session_free
+ pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t));
+ if (!ssn) return NULL;
+ pn_endpoint_init(&ssn->endpoint, SESSION, conn);
+ pni_add_session(conn, ssn);
+ ssn->links = pn_list(PN_WEAKREF, 0);
+ ssn->freed = pn_list(PN_WEAKREF, 0);
+ ssn->context = pn_record();
+ ssn->incoming_capacity = 1024*1024;
+ ssn->incoming_bytes = 0;
+ ssn->outgoing_bytes = 0;
+ ssn->incoming_deliveries = 0;
+ ssn->outgoing_deliveries = 0;
+ ssn->outgoing_window = 2147483647;
+
+ // begin transport state
+ memset(&ssn->state, 0, sizeof(ssn->state));
+ ssn->state.local_channel = (uint16_t)-1;
+ ssn->state.remote_channel = (uint16_t)-1;
+ pn_delivery_map_init(&ssn->state.incoming, 0);
+ pn_delivery_map_init(&ssn->state.outgoing, 0);
+ ssn->state.local_handles = pn_hash(PN_WEAKREF, 0, 0.75);
+ ssn->state.remote_handles = pn_hash(PN_WEAKREF, 0, 0.75);
+ // end transport state
+
+ pn_collector_put(conn->collector, PN_OBJECT, ssn, PN_SESSION_INIT);
+ if (conn->transport) {
+ pni_session_bound(ssn);
+ }
+ pn_decref(ssn);
+ return ssn;
+}
+
+static void pni_session_bound(pn_session_t *ssn)
+{
+ assert(ssn);
+ size_t nlinks = pn_list_size(ssn->links);
+ for (size_t i = 0; i < nlinks; i++) {
+ pni_link_bound((pn_link_t *) pn_list_get(ssn->links, i));
+ }
+}
+
+void pn_session_unbound(pn_session_t* ssn)
+{
+ assert(ssn);
+ ssn->state.local_channel = (uint16_t)-1;
+ ssn->state.remote_channel = (uint16_t)-1;
+ ssn->incoming_bytes = 0;
+ ssn->outgoing_bytes = 0;
+ ssn->incoming_deliveries = 0;
+ ssn->outgoing_deliveries = 0;
+}
+
+size_t pn_session_get_incoming_capacity(pn_session_t *ssn)
+{
+ assert(ssn);
+ return ssn->incoming_capacity;
+}
+
+void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
+{
+ assert(ssn);
+ // XXX: should this trigger a flow?
+ ssn->incoming_capacity = capacity;
+}
+
+size_t pn_session_get_outgoing_window(pn_session_t *ssn)
+{
+ assert(ssn);
+ return ssn->outgoing_window;
+}
+
+void pn_session_set_outgoing_window(pn_session_t *ssn, size_t window)
+{
+ assert(ssn);
+ ssn->outgoing_window = window;
+}
+
+size_t pn_session_outgoing_bytes(pn_session_t *ssn)
+{
+ assert(ssn);
+ return ssn->outgoing_bytes;
+}
+
+size_t pn_session_incoming_bytes(pn_session_t *ssn)
+{
+ assert(ssn);
+ return ssn->incoming_bytes;
+}
+
+pn_state_t pn_session_state(pn_session_t *session)
+{
+ return session->endpoint.state;
+}
+
+pn_error_t *pn_session_error(pn_session_t *session)
+{
+ return session->endpoint.error;
+}
+
+static void pni_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type)
+{
+ terminus->type = type;
+ terminus->address = pn_string(NULL);
+ terminus->durability = PN_NONDURABLE;
+ terminus->expiry_policy = PN_EXPIRE_WITH_SESSION;
+ terminus->timeout = 0;
+ terminus->dynamic = false;
+ terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED;
+ terminus->properties = pn_data(0);
+ terminus->capabilities = pn_data(0);
+ terminus->outcomes = pn_data(0);
+ terminus->filter = pn_data(0);
+}
+
+static void pn_link_incref(void *object)
+{
+ pn_link_t *link = (pn_link_t *) object;
+ if (!link->endpoint.referenced) {
+ link->endpoint.referenced = true;
+ pn_incref(link->session);
+ } else {
+ pn_object_incref(object);
+ }
+}
+
+static void pn_link_finalize(void *object)
+{
+ pn_link_t *link = (pn_link_t *) object;
+ pn_endpoint_t *endpoint = &link->endpoint;
+
+ if (pni_preserve_child(endpoint)) {
+ return;
+ }
+
+ while (link->unsettled_head) {
+ assert(!link->unsettled_head->referenced);
+ pn_free(link->unsettled_head);
+ }
+
+ pn_free(link->context);
+ pni_terminus_free(&link->source);
+ pni_terminus_free(&link->target);
+ pni_terminus_free(&link->remote_source);
+ pni_terminus_free(&link->remote_target);
+ pn_free(link->name);
+ pni_endpoint_tini(endpoint);
+ pni_remove_link(link->session, link);
+ pn_hash_del(link->session->state.local_handles, link->state.local_handle);
+ pn_hash_del(link->session->state.remote_handles, link->state.remote_handle);
+ pn_list_remove(link->session->freed, link);
+ if (endpoint->referenced) {
+ pn_decref(link->session);
+ }
+}
+
+#define pn_link_refcount pn_object_refcount
+#define pn_link_decref pn_object_decref
+#define pn_link_reify pn_object_reify
+#define pn_link_initialize NULL
+#define pn_link_hashcode NULL
+#define pn_link_compare NULL
+#define pn_link_inspect NULL
+
+pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
+{
+#define pn_link_new pn_object_new
+#define pn_link_free pn_object_free
+ static const pn_class_t clazz = PN_METACLASS(pn_link);
+#undef pn_link_new
+#undef pn_link_free
+ pn_link_t *link = (pn_link_t *) pn_class_new(&clazz, sizeof(pn_link_t));
+
+ pn_endpoint_init(&link->endpoint, type, session->connection);
+ pni_add_link(session, link);
+ pn_incref(session); // keep session until link finalized
+ link->name = pn_string(name);
+ pni_terminus_init(&link->source, PN_SOURCE);
+ pni_terminus_init(&link->target, PN_TARGET);
+ pni_terminus_init(&link->remote_source, PN_UNSPECIFIED);
+ pni_terminus_init(&link->remote_target, PN_UNSPECIFIED);
+ link->unsettled_head = link->unsettled_tail = link->current = NULL;
+ link->unsettled_count = 0;
+ link->max_message_size = 0;
+ link->remote_max_message_size = 0;
+ link->available = 0;
+ link->credit = 0;
+ link->queued = 0;
+ link->drain = false;
+ link->drain_flag_mode = true;
+ link->drained = 0;
+ link->context = pn_record();
+ link->snd_settle_mode = PN_SND_MIXED;
+ link->rcv_settle_mode = PN_RCV_FIRST;
+ link->remote_snd_settle_mode = PN_SND_MIXED;
+ link->remote_rcv_settle_mode = PN_RCV_FIRST;
+ link->detached = false;
+
+ // begin transport state
+ link->state.local_handle = -1;
+ link->state.remote_handle = -1;
+ link->state.delivery_count = 0;
+ link->state.link_credit = 0;
+ // end transport state
+
+ pn_collector_put(session->connection->collector, PN_OBJECT, link, PN_LINK_INIT);
+ if (session->connection->transport) {
+ pni_link_bound(link);
+ }
+ pn_decref(link);
+ return link;
+}
+
+static void pni_link_bound(pn_link_t *link)
+{
+}
+
+void pn_link_unbound(pn_link_t* link)
+{
+ assert(link);
+ link->state.local_handle = -1;
+ link->state.remote_handle = -1;
+ link->state.delivery_count = 0;
+ link->state.link_credit = 0;
+}
+
+pn_terminus_t *pn_link_source(pn_link_t *link)
+{
+ return link ? &link->source : NULL;
+}
+
+pn_terminus_t *pn_link_target(pn_link_t *link)
+{
+ return link ? &link->target : NULL;
+}
+
+pn_terminus_t *pn_link_remote_source(pn_link_t *link)
+{
+ return link ? &link->remote_source : NULL;
+}
+
+pn_terminus_t *pn_link_remote_target(pn_link_t *link)
+{
+ return link ? &link->remote_target : NULL;
+}
+
+int pn_terminus_set_type(pn_terminus_t *terminus, pn_terminus_type_t type)
+{
+ if (!terminus) return PN_ARG_ERR;
+ terminus->type = type;
+ return 0;
+}
+
+pn_terminus_type_t pn_terminus_get_type(pn_terminus_t *terminus)
+{
+ return terminus ? terminus->type : (pn_terminus_type_t) 0;
+}
+
+const char *pn_terminus_get_address(pn_terminus_t *terminus)
+{
+ assert(terminus);
+ return pn_string_get(terminus->address);
+}
+
+int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
+{
+ assert(terminus);
+ return pn_string_set(terminus->address, address);
+}
+
+pn_durability_t pn_terminus_get_durability(pn_terminus_t *terminus)
+{
+ return terminus ? terminus->durability : (pn_durability_t) 0;
+}
+
+int pn_terminus_set_durability(pn_terminus_t *terminus, pn_durability_t durability)
+{
+ if (!terminus) return PN_ARG_ERR;
+ terminus->durability = durability;
+ return 0;
+}
+
+pn_expiry_policy_t pn_terminus_get_expiry_policy(pn_terminus_t *terminus)
+{
+ return terminus ? terminus->expiry_policy : (pn_expiry_policy_t) 0;
+}
+
+int pn_terminus_set_expiry_policy(pn_terminus_t *terminus, pn_expiry_policy_t expiry_policy)
+{
+ if (!terminus) return PN_ARG_ERR;
+ terminus->expiry_policy = expiry_policy;
+ return 0;
+}
+
+pn_seconds_t pn_terminus_get_timeout(pn_terminus_t *terminus)
+{
+ return terminus ? terminus->timeout : 0;
+}
+
+int pn_terminus_set_timeout(pn_terminus_t *terminus, pn_seconds_t timeout)
+{
+ if (!terminus) return PN_ARG_ERR;
+ terminus->timeout = timeout;
+ return 0;
+}
+
+bool pn_terminus_is_dynamic(pn_terminus_t *terminus)
+{
+ return terminus ? terminus->dynamic : false;
+}
+
+int pn_terminus_set_dynamic(pn_terminus_t *terminus, bool dynamic)
+{
+ if (!terminus) return PN_ARG_ERR;
+ terminus->dynamic = dynamic;
+ return 0;
+}
+
+pn_data_t *pn_terminus_properties(pn_terminus_t *terminus)
+{
+ return terminus ? terminus->properties : NULL;
+}
+
+pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus)
+{
+ return terminus ? terminus->capabilities : NULL;
+}
+
+pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus)
+{
+ return terminus ? terminus->outcomes : NULL;
+}
+
+pn_data_t *pn_terminus_filter(pn_terminus_t *terminus)
+{
+ return terminus ? terminus->filter : NULL;
+}
+
+pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus)
+{
+ return terminus ? terminus->distribution_mode : PN_DIST_MODE_UNSPECIFIED;
+}
+
+int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m)
+{
+ if (!terminus) return PN_ARG_ERR;
+ terminus->distribution_mode = m;
+ return 0;
+}
+
+int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src)
+{
+ if (!terminus || !src) {
+ return PN_ARG_ERR;
+ }
+
+ terminus->type = src->type;
+ int err = pn_terminus_set_address(terminus, pn_terminus_get_address(src));
+ if (err) return err;
+ terminus->durability = src->durability;
+ terminus->expiry_policy = src->expiry_policy;
+ terminus->timeout = src->timeout;
+ terminus->dynamic = src->dynamic;
+ terminus->distribution_mode = src->distribution_mode;
+ err = pn_data_copy(terminus->properties, src->properties);
+ if (err) return err;
+ err = pn_data_copy(terminus->capabilities, src->capabilities);
+ if (err) return err;
+ err = pn_data_copy(terminus->outcomes, src->outcomes);
+ if (err) return err;
+ err = pn_data_copy(terminus->filter, src->filter);
+ if (err) return err;
+ return 0;
+}
+
+pn_link_t *pn_sender(pn_session_t *session, const char *name)
+{
+ return pn_link_new(SENDER, session, name);
+}
+
+pn_link_t *pn_receiver(pn_session_t *session, const char *name)
+{
+ return pn_link_new(RECEIVER, session, name);
+}
+
+pn_state_t pn_link_state(pn_link_t *link)
+{
+ return link->endpoint.state;
+}
+
+pn_error_t *pn_link_error(pn_link_t *link)
+{
+ return link->endpoint.error;
+}
+
+const char *pn_link_name(pn_link_t *link)
+{
+ assert(link);
+ return pn_string_get(link->name);
+}
+
+bool pn_link_is_sender(pn_link_t *link)
+{
+ return link->endpoint.type == SENDER;
+}
+
+bool pn_link_is_receiver(pn_link_t *link)
+{
+ return link->endpoint.type == RECEIVER;
+}
+
+pn_session_t *pn_link_session(pn_link_t *link)
+{
+ assert(link);
+ return link->session;
+}
+
+static void pn_disposition_finalize(pn_disposition_t *ds)
+{
+ pn_free(ds->data);
+ pn_free(ds->annotations);
+ pn_condition_tini(&ds->condition);
+}
+
+static void pn_delivery_incref(void *object)
+{
+ pn_delivery_t *delivery = (pn_delivery_t *) object;
+ if (delivery->link && !delivery->referenced) {
+ delivery->referenced = true;
+ pn_incref(delivery->link);
+ } else {
+ pn_object_incref(object);
+ }
+}
+
+static bool pni_preserve_delivery(pn_delivery_t *delivery)
+{
+ pn_connection_t *conn = delivery->link->session->connection;
+ return !delivery->local.settled || (conn->transport && (delivery->state.init || delivery->tpwork));
+}
+
+static void pn_delivery_finalize(void *object)
+{
+ pn_delivery_t *delivery = (pn_delivery_t *) object;
+ pn_link_t *link = delivery->link;
+ // assert(!delivery->state.init);
+
+ bool pooled = false;
+ bool referenced = true;
+ if (link) {
+ if (pni_link_live(link) && pni_preserve_delivery(delivery) && delivery->referenced) {
+ delivery->referenced = false;
+ pn_object_incref(delivery);
+ pn_decref(link);
+ return;
+ }
+ referenced = delivery->referenced;
+
+ pn_clear_tpwork(delivery);
+ LL_REMOVE(link, unsettled, delivery);
+ pn_delivery_map_del(pn_link_is_sender(link)
+ ? &link->session->state.outgoing
+ : &link->session->state.incoming,
+ delivery);
+ pn_buffer_clear(delivery->tag);
+ pn_buffer_clear(delivery->bytes);
+ pn_record_clear(delivery->context);
+ delivery->settled = true;
+ pn_connection_t *conn = link->session->connection;
+ assert(pn_refcount(delivery) == 0);
+ if (pni_connection_live(conn)) {
+ pn_list_t *pool = link->session->connection->delivery_pool;
+ delivery->link = NULL;
+ pn_list_add(pool, delivery);
+ pooled = true;
+ assert(pn_refcount(delivery) == 1);
+ }
+ }
+
+ if (!pooled) {
+ pn_free(delivery->context);
+ pn_buffer_free(delivery->tag);
+ pn_buffer_free(delivery->bytes);
+ pn_disposition_finalize(&delivery->local);
+ pn_disposition_finalize(&delivery->remote);
+ }
+
+ if (referenced) {
+ pn_decref(link);
+ }
+}
+
+static void pn_disposition_init(pn_disposition_t *ds)
+{
+ ds->data = pn_data(0);
+ ds->annotations = pn_data(0);
+ pn_condition_init(&ds->condition);
+}
+
+static void pn_disposition_clear(pn_disposition_t *ds)
+{
+ ds->type = 0;
+ ds->section_number = 0;
+ ds->section_offset = 0;
+ ds->failed = false;
+ ds->undeliverable = false;
+ ds->settled = false;
+ pn_data_clear(ds->data);
+ pn_data_clear(ds->annotations);
+ pn_condition_clear(&ds->condition);
+}
+
+#define pn_delivery_new pn_object_new
+#define pn_delivery_refcount pn_object_refcount
+#define pn_delivery_decref pn_object_decref
+#define pn_delivery_free pn_object_free
+#define pn_delivery_reify pn_object_reify
+#define pn_delivery_initialize NULL
+#define pn_delivery_hashcode NULL
+#define pn_delivery_compare NULL
+
+int pn_delivery_inspect(void *obj, pn_string_t *dst) {
+ pn_delivery_t *d = (pn_delivery_t*)obj;
+ const char* dir = pn_link_is_sender(d->link) ? "sending" : "receiving";
+ pn_bytes_t bytes = pn_buffer_bytes(d->tag);
+ int err =
+ pn_string_addf(dst, "pn_delivery<%p>{%s, tag=b\"", obj, dir) ||
+ pn_quote(dst, bytes.start, bytes.size) ||
+ pn_string_addf(dst, "\", local=%s, remote=%s}",
+ pn_disposition_type_name(d->local.type),
+ pn_disposition_type_name(d->remote.type));
+ return err;
+}
+
+pn_delivery_tag_t pn_dtag(const char *bytes, size_t size) {
+ pn_delivery_tag_t dtag = {size, bytes};
+ return dtag;
+}
+
+pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
+{
+ assert(link);
+ pn_list_t *pool = link->session->connection->delivery_pool;
+ pn_delivery_t *delivery = (pn_delivery_t *) pn_list_pop(pool);
+ if (!delivery) {
+ static const pn_class_t clazz = PN_METACLASS(pn_delivery);
+ delivery = (pn_delivery_t *) pn_class_new(&clazz, sizeof(pn_delivery_t));
+ if (!delivery) return NULL;
+ delivery->tag = pn_buffer(16);
+ delivery->bytes = pn_buffer(64);
+ pn_disposition_init(&delivery->local);
+ pn_disposition_init(&delivery->remote);
+ delivery->context = pn_record();
+ } else {
+ assert(!delivery->state.init);
+ }
+ delivery->link = link;
+ pn_incref(delivery->link); // keep link until finalized
+ pn_buffer_clear(delivery->tag);
+ pn_buffer_append(delivery->tag, tag.start, tag.size);
+ pn_disposition_clear(&delivery->local);
+ pn_disposition_clear(&delivery->remote);
+ delivery->updated = false;
+ delivery->settled = false;
+ LL_ADD(link, unsettled, delivery);
+ delivery->referenced = true;
+ delivery->work_next = NULL;
+ delivery->work_prev = NULL;
+ delivery->work = false;
+ delivery->tpwork_next = NULL;
+ delivery->tpwork_prev = NULL;
+ delivery->tpwork = false;
+ pn_buffer_clear(delivery->bytes);
+ delivery->done = false;
+ delivery->aborted = false;
+ pn_record_clear(delivery->context);
+
+ // begin delivery state
+ delivery->state.init = false;
+ delivery->state.sending = false; /* True if we have sent at least 1 frame */
+ delivery->state.sent = false; /* True if we have sent the entire delivery */
+ // end delivery state
+
+ if (!link->current)
+ link->current = delivery;
+
+ link->unsettled_count++;
+
+ pn_work_update(link->session->connection, delivery);
+
+ // XXX: could just remove incref above
+ pn_decref(delivery);
+
+ return delivery;
+}
+
+bool pn_delivery_buffered(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ if (delivery->settled) return false;
+ if (pn_link_is_sender(delivery->link)) {
+ pn_delivery_state_t *state = &delivery->state;
+ if (state->sent) {
+ return false;
+ } else {
+ return delivery->done || (pn_buffer_size(delivery->bytes) > 0);
+ }
+ } else {
+ return false;
+ }
+}
+
+int pn_link_unsettled(pn_link_t *link)
+{
+ return link->unsettled_count;
+}
+
+pn_delivery_t *pn_unsettled_head(pn_link_t *link)
+{
+ pn_delivery_t *d = link->unsettled_head;
+ while (d && d->local.settled) {
+ d = d->unsettled_next;
+ }
+ return d;
+}
+
+pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery)
+{
+ pn_delivery_t *d = delivery->unsettled_next;
+ while (d && d->local.settled) {
+ d = d->unsettled_next;
+ }
+ return d;
+}
+
+bool pn_delivery_current(pn_delivery_t *delivery)
+{
+ pn_link_t *link = delivery->link;
+ return pn_link_current(link) == delivery;
+}
+
+void pn_delivery_dump(pn_delivery_t *d)
+{
+ char tag[1024];
+ pn_bytes_t bytes = pn_buffer_bytes(d->tag);
+ pn_quote_data(tag, 1024, bytes.start, bytes.size);
+ printf("{tag=%s, local.type=%" PRIu64 ", remote.type=%" PRIu64 ", local.settled=%u, "
+ "remote.settled=%u, updated=%u, current=%u, writable=%u, readable=%u, "
+ "work=%u}",
+ tag, d->local.type, d->remote.type, d->local.settled,
+ d->remote.settled, d->updated, pn_delivery_current(d),
+ pn_delivery_writable(d), pn_delivery_readable(d), d->work);
+}
+
+void *pn_delivery_get_context(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ return pn_record_get(delivery->context, PN_LEGCTX);
+}
+
+void pn_delivery_set_context(pn_delivery_t *delivery, void *context)
+{
+ assert(delivery);
+ pn_record_set(delivery->context, PN_LEGCTX, context);
+}
+
+pn_record_t *pn_delivery_attachments(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ return delivery->context;
+}
+
+uint64_t pn_disposition_type(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->type;
+}
+
+pn_data_t *pn_disposition_data(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->data;
+}
+
+uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->section_number;
+}
+
+void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number)
+{
+ assert(disposition);
+ disposition->section_number = section_number;
+}
+
+uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->section_offset;
+}
+
+void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset)
+{
+ assert(disposition);
+ disposition->section_offset = section_offset;
+}
+
+bool pn_disposition_is_failed(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->failed;
+}
+
+void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed)
+{
+ assert(disposition);
+ disposition->failed = failed;
+}
+
+bool pn_disposition_is_undeliverable(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->undeliverable;
+}
+
+void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable)
+{
+ assert(disposition);
+ disposition->undeliverable = undeliverable;
+}
+
+pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return disposition->annotations;
+}
+
+pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition)
+{
+ assert(disposition);
+ return &disposition->condition;
+}
+
+pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
+{
+ if (delivery) {
+ pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
+ return pn_dtag(tag.start, tag.size);
+ } else {
+ return pn_dtag(0, 0);
+ }
+}
+
+pn_delivery_t *pn_link_current(pn_link_t *link)
+{
+ if (!link) return NULL;
+ return link->current;
+}
+
+static void pni_advance_sender(pn_link_t *link)
+{
+ link->current->done = true;
+ /* Skip accounting if the link is aborted and has not sent any frames.
+ A delivery that was aborted before sending the first frame was not accounted
+ for in pni_process_tpwork_sender() so we don't need to account for it being sent here.
+ */
+ bool skip = link->current->aborted && !link->current->state.sending;
+ if (!skip) {
+ link->queued++;
+ link->credit--;
+ link->session->outgoing_deliveries++;
+ }
+ pni_add_tpwork(link->current);
+ link->current = link->current->unsettled_next;
+}
+
+static void pni_advance_receiver(pn_link_t *link)
+{
+ link->credit--;
+ link->queued--;
+ link->session->incoming_deliveries--;
+
+ pn_delivery_t *current = link->current;
+ link->session->incoming_bytes -= pn_buffer_size(current->bytes);
+ pn_buffer_clear(current->bytes);
+
+ if (!link->session->state.incoming_window) {
+ pni_add_tpwork(current);
+ }
+
+ link->current = link->current->unsettled_next;
+}
+
+bool pn_link_advance(pn_link_t *link)
+{
+ if (link && link->current) {
+ pn_delivery_t *prev = link->current;
+ if (link->endpoint.type == SENDER) {
+ pni_advance_sender(link);
+ } else {
+ pni_advance_receiver(link);
+ }
+ pn_delivery_t *next = link->current;
+ pn_work_update(link->session->connection, prev);
+ if (next) pn_work_update(link->session->connection, next);
+ return prev != next;
+ } else {
+ return false;
+ }
+}
+
+int pn_link_credit(pn_link_t *link)
+{
+ return link ? link->credit : 0;
+}
+
+int pn_link_available(pn_link_t *link)
+{
+ return link ? link->available : 0;
+}
+
+int pn_link_queued(pn_link_t *link)
+{
+ return link ? link->queued : 0;
+}
+
+int pn_link_remote_credit(pn_link_t *link)
+{
+ assert(link);
+ return link->credit - link->queued;
+}
+
+bool pn_link_get_drain(pn_link_t *link)
+{
+ assert(link);
+ return link->drain;
+}
+
+pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link)
+{
+ return link ? (pn_snd_settle_mode_t)link->snd_settle_mode
+ : PN_SND_MIXED;
+}
+
+pn_rcv_settle_mode_t pn_link_rcv_settle_mode(pn_link_t *link)
+{
+ return link ? (pn_rcv_settle_mode_t)link->rcv_settle_mode
+ : PN_RCV_FIRST;
+}
+
+pn_snd_settle_mode_t pn_link_remote_snd_settle_mode(pn_link_t *link)
+{
+ return link ? (pn_snd_settle_mode_t)link->remote_snd_settle_mode
+ : PN_SND_MIXED;
+}
+
+pn_rcv_settle_mode_t pn_link_remote_rcv_settle_mode(pn_link_t *link)
+{
+ return link ? (pn_rcv_settle_mode_t)link->remote_rcv_settle_mode
+ : PN_RCV_FIRST;
+}
+void pn_link_set_snd_settle_mode(pn_link_t *link, pn_snd_settle_mode_t mode)
+{
+ if (link)
+ link->snd_settle_mode = (uint8_t)mode;
+}
+void pn_link_set_rcv_settle_mode(pn_link_t *link, pn_rcv_settle_mode_t mode)
+{
+ if (link)
+ link->rcv_settle_mode = (uint8_t)mode;
+}
+
+void pn_delivery_settle(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ if (!delivery->local.settled) {
+ pn_link_t *link = delivery->link;
+ if (pn_delivery_current(delivery)) {
+ pn_link_advance(link);
+ }
+
+ link->unsettled_count--;
+ delivery->local.settled = true;
+ pni_add_tpwork(delivery);
+ pn_work_update(delivery->link->session->connection, delivery);
+ pn_incref(delivery);
+ pn_decref(delivery);
+ }
+}
+
+void pn_link_offered(pn_link_t *sender, int credit)
+{
+ sender->available = credit;
+}
+
+ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n)
+{
+ pn_delivery_t *current = pn_link_current(sender);
+ if (!current) return PN_EOS;
+ if (!bytes || !n) return 0;
+ pn_buffer_append(current->bytes, bytes, n);
+ sender->session->outgoing_bytes += n;
+ pni_add_tpwork(current);
+ return n;
+}
+
+int pn_link_drained(pn_link_t *link)
+{
+ assert(link);
+ int drained = 0;
+
+ if (pn_link_is_sender(link)) {
+ if (link->drain && link->credit > 0) {
+ link->drained = link->credit;
+ link->credit = 0;
+ pn_modified(link->session->connection, &link->endpoint, true);
+ drained = link->drained;
+ }
+ } else {
+ drained = link->drained;
+ link->drained = 0;
+ }
+
+ return drained;
+}
+
+ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n)
+{
+ if (!receiver) return PN_ARG_ERR;
+ pn_delivery_t *delivery = receiver->current;
+ if (!delivery) return PN_STATE_ERR;
+ if (delivery->aborted) return PN_ABORTED;
+ size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
+ pn_buffer_trim(delivery->bytes, size, 0);
+ if (size) {
+ receiver->session->incoming_bytes -= size;
+ if (!receiver->session->state.incoming_window) {
+ pni_add_tpwork(delivery);
+ }
+ return size;
+ } else {
+ return delivery->done ? PN_EOS : 0;
+ }
+}
+
+
+void pn_link_flow(pn_link_t *receiver, int credit)
+{
+ assert(receiver);
+ assert(pn_link_is_receiver(receiver));
+ receiver->credit += credit;
+ pn_modified(receiver->session->connection, &receiver->endpoint, true);
+ if (!receiver->drain_flag_mode) {
+ pn_link_set_drain(receiver, false);
+ receiver->drain_flag_mode = false;
+ }
+}
+
+void pn_link_drain(pn_link_t *receiver, int credit)
+{
+ assert(receiver);
+ assert(pn_link_is_receiver(receiver));
+ pn_link_set_drain(receiver, true);
+ pn_link_flow(receiver, credit);
+ receiver->drain_flag_mode = false;
+}
+
+void pn_link_set_drain(pn_link_t *receiver, bool drain)
+{
+ assert(receiver);
+ assert(pn_link_is_receiver(receiver));
+ receiver->drain = drain;
+ pn_modified(receiver->session->connection, &receiver->endpoint, true);
+ receiver->drain_flag_mode = true;
+}
+
+bool pn_link_draining(pn_link_t *receiver)
+{
+ assert(receiver);
+ assert(pn_link_is_receiver(receiver));
+ return receiver->drain && (pn_link_credit(receiver) > pn_link_queued(receiver));
+}
+
+uint64_t pn_link_max_message_size(pn_link_t *link)
+{
+ return link->max_message_size;
+}
+
+void pn_link_set_max_message_size(pn_link_t *link, uint64_t size)
+{
+ link->max_message_size = size;
+}
+
+uint64_t pn_link_remote_max_message_size(pn_link_t *link)
+{
+ return link->remote_max_message_size;
+}
+
+pn_link_t *pn_delivery_link(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ return delivery->link;
+}
+
+pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ return &delivery->local;
+}
+
+uint64_t pn_delivery_local_state(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ return delivery->local.type;
+}
+
+pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ return &delivery->remote;
+}
+
+uint64_t pn_delivery_remote_state(pn_delivery_t *delivery)
+{
+ assert(delivery);
+ return delivery->remote.type;
+}
+
+bool pn_delivery_settled(pn_delivery_t *delivery)
+{
+ return delivery ? delivery->remote.settled : false;
+}
+
+bool pn_delivery_updated(pn_delivery_t *delivery)
+{
+ return delivery ? delivery->updated : false;
+}
+
+void pn_delivery_clear(pn_delivery_t *delivery)
+{
+ delivery->updated = false;
+ pn_work_update(delivery->link->session->connection, delivery);
+}
+
+void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
+{
+ if (!delivery) return;
+ delivery->local.type = state;
+ pni_add_tpwork(delivery);
+}
+
+bool pn_delivery_writable(pn_delivery_t *delivery)
+{
+ if (!delivery) return false;
+
+ pn_link_t *link = delivery->link;
+ return pn_link_is_sender(link) && pn_delivery_current(delivery) && pn_link_credit(link) > 0;
+}
+
+bool pn_delivery_readable(pn_delivery_t *delivery)
+{
+ if (delivery) {
+ pn_link_t *link = delivery->link;
+ return pn_link_is_receiver(link) && pn_delivery_current(delivery);
+ } else {
+ return false;
+ }
+}
+
+size_t pn_delivery_pending(pn_delivery_t *delivery)
+{
+ /* Aborted deliveries: for clients that don't check pn_delivery_aborted(),
+ return 1 rather than 0. This will force them to call pn_link_recv() and get
+ the PN_ABORTED error return code.
+ */
+ if (delivery->aborted) return 1;
+ return pn_buffer_size(delivery->bytes);
+}
+
+bool pn_delivery_partial(pn_delivery_t *delivery)
+{
+ return !delivery->done;
+}
+
+void pn_delivery_abort(pn_delivery_t *delivery) {
+ if (!delivery->local.settled) { /* Can't abort a settled delivery */
+ delivery->aborted = true;
+ pn_delivery_settle(delivery);
+ }
+}
+
+bool pn_delivery_aborted(pn_delivery_t *delivery) {
+ return delivery->aborted;
+}
+
+pn_condition_t *pn_connection_condition(pn_connection_t *connection)
+{
+ assert(connection);
+ return &connection->endpoint.condition;
+}
+
+pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection)
+{
+ assert(connection);
+ pn_transport_t *transport = connection->transport;
+ return transport ? &transport->remote_condition : NULL;
+}
+
+pn_condition_t *pn_session_condition(pn_session_t *session)
+{
+ assert(session);
+ return &session->endpoint.condition;
+}
+
+pn_condition_t *pn_session_remote_condition(pn_session_t *session)
+{
+ assert(session);
+ return &session->endpoint.remote_condition;
+}
+
+pn_condition_t *pn_link_condition(pn_link_t *link)
+{
+ assert(link);
+ return &link->endpoint.condition;
+}
+
+pn_condition_t *pn_link_remote_condition(pn_link_t *link)
+{
+ assert(link);
+ return &link->endpoint.remote_condition;
+}
+
+bool pn_condition_is_set(pn_condition_t *condition)
+{
+ return condition && pn_string_get(condition->name);
+}
+
+void pn_condition_clear(pn_condition_t *condition)
+{
+ assert(condition);
+ pn_string_clear(condition->name);
+ pn_string_clear(condition->description);
+ pn_data_clear(condition->info);
+}
+
+const char *pn_condition_get_name(pn_condition_t *condition)
+{
+ assert(condition);
+ return pn_string_get(condition->name);
+}
+
+int pn_condition_set_name(pn_condition_t *condition, const char *name)
+{
+ assert(condition);
+ return pn_string_set(condition->name, name);
+}
+
+const char *pn_condition_get_description(pn_condition_t *condition)
+{
+ assert(condition);
+ return pn_string_get(condition->description);
+}
+
+int pn_condition_set_description(pn_condition_t *condition, const char *description)
+{
+ assert(condition);
+ return pn_string_set(condition->description, description);
+}
+
+int pn_condition_vformat(pn_condition_t *condition, const char *name, const char *fmt, va_list ap)
+{
+ assert(condition);
+ int err = pn_condition_set_name(condition, name);
+ if (err)
+ return err;
+
+ char text[1024];
+ size_t n = pni_vsnprintf(text, 1024, fmt, ap);
+ if (n >= sizeof(text))
+ text[sizeof(text)-1] = '\0';
+ err = pn_condition_set_description(condition, text);
+ return err;
+}
+
+int pn_condition_format(pn_condition_t *condition, const char *name, const char *fmt, ...)
+{
+ assert(condition);
+ va_list ap;
+ va_start(ap, fmt);
+ int err = pn_condition_vformat(condition, name, fmt, ap);
+ va_end(ap);
+ return err;
+}
+
+pn_data_t *pn_condition_info(pn_condition_t *condition)
+{
+ assert(condition);
+ return condition->info;
+}
+
+bool pn_condition_is_redirect(pn_condition_t *condition)
+{
+ const char *name = pn_condition_get_name(condition);
+ return name && (!strcmp(name, "amqp:connection:redirect") ||
+ !strcmp(name, "amqp:link:redirect"));
+}
+
+const char *pn_condition_redirect_host(pn_condition_t *condition)
+{
+ pn_data_t *data = pn_condition_info(condition);
+ pn_data_rewind(data);
+ pn_data_next(data);
+ pn_data_enter(data);
+ pn_data_lookup(data, "network-host");
+ pn_bytes_t host = pn_data_get_bytes(data);
+ pn_data_rewind(data);
+ return host.start;
+}
+
+int pn_condition_redirect_port(pn_condition_t *condition)
+{
+ pn_data_t *data = pn_condition_info(condition);
+ pn_data_rewind(data);
+ pn_data_next(data);
+ pn_data_enter(data);
+ pn_data_lookup(data, "port");
+ int port = pn_data_get_int(data);
+ pn_data_rewind(data);
+ return port;
+}
+
+pn_connection_t *pn_event_connection(pn_event_t *event)
+{
+ pn_session_t *ssn;
+ pn_transport_t *transport;
+
+ switch (pn_class_id(pn_event_class(event))) {
+ case CID_pn_connection:
+ return (pn_connection_t *) pn_event_context(event);
+ case CID_pn_transport:
+ transport = pn_event_transport(event);
+ if (transport)
+ return transport->connection;
+ return NULL;
+ default:
+ ssn = pn_event_session(event);
+ if (ssn)
+ return pn_session_connection(ssn);
+ }
+ return NULL;
+}
+
+pn_session_t *pn_event_session(pn_event_t *event)
+{
+ pn_link_t *link;
+ switch (pn_class_id(pn_event_class(event))) {
+ case CID_pn_session:
+ return (pn_session_t *) pn_event_context(event);
+ default:
+ link = pn_event_link(event);
+ if (link)
+ return pn_link_session(link);
+ }
+ return NULL;
+}
+
+pn_link_t *pn_event_link(pn_event_t *event)
+{
+ pn_delivery_t *dlv;
+ switch (pn_class_id(pn_event_class(event))) {
+ case CID_pn_link:
+ return (pn_link_t *) pn_event_context(event);
+ default:
+ dlv = pn_event_delivery(event);
+ if (dlv)
+ return pn_delivery_link(dlv);
+ }
+ return NULL;
+}
+
+pn_delivery_t *pn_event_delivery(pn_event_t *event)
+{
+ switch (pn_class_id(pn_event_class(event))) {
+ case CID_pn_delivery:
+ return (pn_delivery_t *) pn_event_context(event);
+ default:
+ return NULL;
+ }
+}
+
+pn_transport_t *pn_event_transport(pn_event_t *event)
+{
+ switch (pn_class_id(pn_event_class(event))) {
+ case CID_pn_transport:
+ return (pn_transport_t *) pn_event_context(event);
+ default:
+ {
+ pn_connection_t *conn = pn_event_connection(event);
+ if (conn)
+ return pn_connection_transport(conn);
+ return NULL;
+ }
+ }
+}
+
+int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src) {
+ assert(dest);
+ assert(src);
+ int err = 0;
+ if (src != dest) {
+ err = pn_string_copy(dest->name, src->name);
+ if (!err) err = pn_string_copy(dest->description, src->description);
+ if (!err) err = pn_data_copy(dest->info, src->info);
+ }
+ return err;
+}
+
+
+static pn_condition_t *cond_set(pn_condition_t *cond) {
+ return cond && pn_condition_is_set(cond) ? cond : NULL;
+}
+
+static pn_condition_t *cond2_set(pn_condition_t *cond1, pn_condition_t *cond2) {
+ pn_condition_t *cond = cond_set(cond1);
+ if (!cond) cond = cond_set(cond2);
+ return cond;
+}
+
+pn_condition_t *pn_event_condition(pn_event_t *e) {
+ void *ctx = pn_event_context(e);
+ switch (pn_class_id(pn_event_class(e))) {
+ case CID_pn_connection: {
+ pn_connection_t *c = (pn_connection_t*)ctx;
+ return cond2_set(pn_connection_remote_condition(c), pn_connection_condition(c));
+ }
+ case CID_pn_session: {
+ pn_session_t *s = (pn_session_t*)ctx;
+ return cond2_set(pn_session_remote_condition(s), pn_session_condition(s));
+ }
+ case CID_pn_link: {
+ pn_link_t *l = (pn_link_t*)ctx;
+ return cond2_set(pn_link_remote_condition(l), pn_link_condition(l));
+ }
+ case CID_pn_transport:
+ return cond_set(pn_transport_condition((pn_transport_t*)ctx));
+
+ default:
+ return NULL;
+ }
+}
+
+const char *pn_disposition_type_name(uint64_t d) {
+ switch(d) {
+ case PN_RECEIVED: return "received";
+ case PN_ACCEPTED: return "accepted";
+ case PN_REJECTED: return "rejected";
+ case PN_RELEASED: return "released";
+ case PN_MODIFIED: return "modified";
+ default: return "unknown";
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/error.c
----------------------------------------------------------------------
diff --git a/c/src/core/error.c b/c/src/core/error.c
new file mode 100644
index 0000000..070144a
--- /dev/null
+++ b/c/src/core/error.c
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "platform/platform.h"
+#include "util.h"
+
+#include <proton/error.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+struct pn_error_t {
+ char *text;
+ pn_error_t *root;
+ int code;
+};
+
+pn_error_t *pn_error()
+{
+ pn_error_t *error = (pn_error_t *) malloc(sizeof(pn_error_t));
+ if (error != NULL) {
+ error->code = 0;
+ error->text = NULL;
+ error->root = NULL;
+ }
+ return error;
+}
+
+void pn_error_free(pn_error_t *error)
+{
+ if (error) {
+ free(error->text);
+ free(error);
+ }
+}
+
+void pn_error_clear(pn_error_t *error)
+{
+ if (error) {
+ error->code = 0;
+ free(error->text);
+ error->text = NULL;
+ error->root = NULL;
+ }
+}
+
+int pn_error_set(pn_error_t *error, int code, const char *text)
+{
+ assert(error);
+ pn_error_clear(error);
+ if (code) {
+ error->code = code;
+ error->text = pn_strdup(text);
+ }
+ return code;
+}
+
+int pn_error_vformat(pn_error_t *error, int code, const char *fmt, va_list ap)
+{
+ assert(error);
+ char text[1024];
+ int n = pni_vsnprintf(text, 1024, fmt, ap);
+ if (n >= 1024) {
+ text[1023] = '\0';
+ }
+ return pn_error_set(error, code, text);
+}
+
+int pn_error_format(pn_error_t *error, int code, const char *fmt, ...)
+{
+ assert(error);
+ va_list ap;
+ va_start(ap, fmt);
+ int rcode = pn_error_vformat(error, code, fmt, ap);
+ va_end(ap);
+ return rcode;
+}
+
+int pn_error_code(pn_error_t *error)
+{
+ assert(error);
+ return error->code;
+}
+
+const char *pn_error_text(pn_error_t *error)
+{
+ assert(error);
+ return error->text;
+}
+
+int pn_error_copy(pn_error_t *error, pn_error_t *src)
+{
+ assert(error);
+ if (src) {
+ return pn_error_set(error, pn_error_code(src), pn_error_text(src));
+ } else {
+ pn_error_clear(error);
+ return 0;
+ }
+}
+
+const char *pn_code(int code)
+{
+ switch (code)
+ {
+ case 0: return "<ok>";
+ case PN_EOS: return "PN_EOS";
+ case PN_ERR: return "PN_ERR";
+ case PN_OVERFLOW: return "PN_OVERFLOW";
+ case PN_UNDERFLOW: return "PN_UNDERFLOW";
+ case PN_STATE_ERR: return "PN_STATE_ERR";
+ case PN_ARG_ERR: return "PN_ARG_ERR";
+ case PN_TIMEOUT: return "PN_TIMEOUT";
+ case PN_INTR: return "PN_INTR";
+ case PN_INPROGRESS: return "PN_INPROGRESS";
+ case PN_OUT_OF_MEMORY: return "PN_OUT_OF_MEMORY";
+ case PN_ABORTED: return "PN_ABORTED";
+ default: return "<unknown>";
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/event.c
----------------------------------------------------------------------
diff --git a/c/src/core/event.c b/c/src/core/event.c
new file mode 100644
index 0000000..c24131d
--- /dev/null
+++ b/c/src/core/event.c
@@ -0,0 +1,410 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <stdio.h>
+#include <proton/object.h>
+#include <proton/event.h>
+#include <proton/reactor.h>
+#include <assert.h>
+
+struct pn_collector_t {
+ pn_list_t *pool;
+ pn_event_t *head;
+ pn_event_t *tail;
+ pn_event_t *prev; /* event returned by previous call to pn_collector_next() */
+ bool freed;
+};
+
+struct pn_event_t {
+ pn_list_t *pool;
+ const pn_class_t *clazz;
+ void *context; // depends on clazz
+ pn_record_t *attachments;
+ pn_event_t *next;
+ pn_event_type_t type;
+};
+
+static void pn_collector_initialize(pn_collector_t *collector)
+{
+ collector->pool = pn_list(PN_OBJECT, 0);
+ collector->head = NULL;
+ collector->tail = NULL;
+ collector->prev = NULL;
+ collector->freed = false;
+}
+
+void pn_collector_drain(pn_collector_t *collector)
+{
+ assert(collector);
+ while (pn_collector_next(collector))
+ ;
+ assert(!collector->head);
+ assert(!collector->tail);
+}
+
+static void pn_collector_shrink(pn_collector_t *collector)
+{
+ assert(collector);
+ pn_list_clear(collector->pool);
+}
+
+static void pn_collector_finalize(pn_collector_t *collector)
+{
+ pn_collector_drain(collector);
+ pn_decref(collector->pool);
+}
+
+static int pn_collector_inspect(pn_collector_t *collector, pn_string_t *dst)
+{
+ assert(collector);
+ int err = pn_string_addf(dst, "EVENTS[");
+ if (err) return err;
+ pn_event_t *event = collector->head;
+ bool first = true;
+ while (event) {
+ if (first) {
+ first = false;
+ } else {
+ err = pn_string_addf(dst, ", ");
+ if (err) return err;
+ }
+ err = pn_inspect(event, dst);
+ if (err) return err;
+ event = event->next;
+ }
+ return pn_string_addf(dst, "]");
+}
+
+#define pn_collector_hashcode NULL
+#define pn_collector_compare NULL
+
+PN_CLASSDEF(pn_collector)
+
+pn_collector_t *pn_collector(void)
+{
+ return pn_collector_new();
+}
+
+void pn_collector_free(pn_collector_t *collector)
+{
+ assert(collector);
+ pn_collector_release(collector);
+ pn_decref(collector);
+}
+
+void pn_collector_release(pn_collector_t *collector)
+{
+ assert(collector);
+ if (!collector->freed) {
+ collector->freed = true;
+ pn_collector_drain(collector);
+ pn_collector_shrink(collector);
+ }
+}
+
+pn_event_t *pn_event(void);
+
+pn_event_t *pn_collector_put(pn_collector_t *collector,
+ const pn_class_t *clazz, void *context,
+ pn_event_type_t type)
+{
+ if (!collector) {
+ return NULL;
+ }
+
+ assert(context);
+
+ if (collector->freed) {
+ return NULL;
+ }
+
+ pn_event_t *tail = collector->tail;
+ if (tail && tail->type == type && tail->context == context) {
+ return NULL;
+ }
+
+ clazz = clazz->reify(context);
+
+ pn_event_t *event = (pn_event_t *) pn_list_pop(collector->pool);
+
+ if (!event) {
+ event = pn_event();
+ }
+
+ event->pool = collector->pool;
+ pn_incref(event->pool);
+
+ if (tail) {
+ tail->next = event;
+ collector->tail = event;
+ } else {
+ collector->tail = event;
+ collector->head = event;
+ }
+
+ event->clazz = clazz;
+ event->context = context;
+ event->type = type;
+ pn_class_incref(clazz, event->context);
+
+ return event;
+}
+
+pn_event_t *pn_collector_peek(pn_collector_t *collector)
+{
+ return collector->head;
+}
+
+// Advance head pointer for pop or next, return the old head.
+static pn_event_t *pop_internal(pn_collector_t *collector) {
+ pn_event_t *event = collector->head;
+ if (event) {
+ collector->head = event->next;
+ if (!collector->head) {
+ collector->tail = NULL;
+ }
+ }
+ return event;
+}
+
+bool pn_collector_pop(pn_collector_t *collector) {
+ pn_event_t *event = pop_internal(collector);
+ if (event) {
+ pn_decref(event);
+ }
+ return event;
+}
+
+pn_event_t *pn_collector_next(pn_collector_t *collector) {
+ if (collector->prev) {
+ pn_decref(collector->prev);
+ }
+ collector->prev = pop_internal(collector);
+ return collector->prev;
+}
+
+pn_event_t *pn_collector_prev(pn_collector_t *collector) {
+ return collector->prev;
+}
+
+bool pn_collector_more(pn_collector_t *collector)
+{
+ assert(collector);
+ return collector->head && collector->head->next;
+}
+
+static void pn_event_initialize(pn_event_t *event)
+{
+ event->pool = NULL;
+ event->type = PN_EVENT_NONE;
+ event->clazz = NULL;
+ event->context = NULL;
+ event->next = NULL;
+ event->attachments = pn_record();
+}
+
+static void pn_event_finalize(pn_event_t *event) {
+ // decref before adding to the free list
+ if (event->clazz && event->context) {
+ pn_class_decref(event->clazz, event->context);
+ }
+
+ pn_list_t *pool = event->pool;
+
+ if (pool && pn_refcount(pool) > 1) {
+ event->pool = NULL;
+ event->type = PN_EVENT_NONE;
+ event->clazz = NULL;
+ event->context = NULL;
+ event->next = NULL;
+ pn_record_clear(event->attachments);
+ pn_list_add(pool, event);
+ } else {
+ pn_decref(event->attachments);
+ }
+
+ pn_decref(pool);
+}
+
+static int pn_event_inspect(pn_event_t *event, pn_string_t *dst)
+{
+ assert(event);
+ assert(dst);
+ const char *name = pn_event_type_name(event->type);
+ int err;
+ if (name) {
+ err = pn_string_addf(dst, "(%s", pn_event_type_name(event->type));
+ } else {
+ err = pn_string_addf(dst, "(<%u>", (unsigned int) event->type);
+ }
+ if (err) return err;
+ if (event->context) {
+ err = pn_string_addf(dst, ", ");
+ if (err) return err;
+ err = pn_class_inspect(event->clazz, event->context, dst);
+ if (err) return err;
+ }
+
+ return pn_string_addf(dst, ")");
+}
+
+#define pn_event_hashcode NULL
+#define pn_event_compare NULL
+
+PN_CLASSDEF(pn_event)
+
+pn_event_t *pn_event(void)
+{
+ return pn_event_new();
+}
+
+pn_event_type_t pn_event_type(pn_event_t *event)
+{
+ return event ? event->type : PN_EVENT_NONE;
+}
+
+const pn_class_t *pn_event_class(pn_event_t *event)
+{
+ assert(event);
+ return event->clazz;
+}
+
+void *pn_event_context(pn_event_t *event)
+{
+ assert(event);
+ return event->context;
+}
+
+pn_record_t *pn_event_attachments(pn_event_t *event)
+{
+ assert(event);
+ return event->attachments;
+}
+
+const char *pn_event_type_name(pn_event_type_t type)
+{
+ switch (type) {
+ case PN_EVENT_NONE:
+ return "PN_EVENT_NONE";
+ case PN_REACTOR_INIT:
+ return "PN_REACTOR_INIT";
+ case PN_REACTOR_QUIESCED:
+ return "PN_REACTOR_QUIESCED";
+ case PN_REACTOR_FINAL:
+ return "PN_REACTOR_FINAL";
+ case PN_TIMER_TASK:
+ return "PN_TIMER_TASK";
+ case PN_CONNECTION_INIT:
+ return "PN_CONNECTION_INIT";
+ case PN_CONNECTION_BOUND:
+ return "PN_CONNECTION_BOUND";
+ case PN_CONNECTION_UNBOUND:
+ return "PN_CONNECTION_UNBOUND";
+ case PN_CONNECTION_REMOTE_OPEN:
+ return "PN_CONNECTION_REMOTE_OPEN";
+ case PN_CONNECTION_LOCAL_OPEN:
+ return "PN_CONNECTION_LOCAL_OPEN";
+ case PN_CONNECTION_REMOTE_CLOSE:
+ return "PN_CONNECTION_REMOTE_CLOSE";
+ case PN_CONNECTION_LOCAL_CLOSE:
+ return "PN_CONNECTION_LOCAL_CLOSE";
+ case PN_CONNECTION_FINAL:
+ return "PN_CONNECTION_FINAL";
+ case PN_SESSION_INIT:
+ return "PN_SESSION_INIT";
+ case PN_SESSION_REMOTE_OPEN:
+ return "PN_SESSION_REMOTE_OPEN";
+ case PN_SESSION_LOCAL_OPEN:
+ return "PN_SESSION_LOCAL_OPEN";
+ case PN_SESSION_REMOTE_CLOSE:
+ return "PN_SESSION_REMOTE_CLOSE";
+ case PN_SESSION_LOCAL_CLOSE:
+ return "PN_SESSION_LOCAL_CLOSE";
+ case PN_SESSION_FINAL:
+ return "PN_SESSION_FINAL";
+ case PN_LINK_INIT:
+ return "PN_LINK_INIT";
+ case PN_LINK_REMOTE_OPEN:
+ return "PN_LINK_REMOTE_OPEN";
+ case PN_LINK_LOCAL_OPEN:
+ return "PN_LINK_LOCAL_OPEN";
+ case PN_LINK_REMOTE_CLOSE:
+ return "PN_LINK_REMOTE_CLOSE";
+ case PN_LINK_LOCAL_DETACH:
+ return "PN_LINK_LOCAL_DETACH";
+ case PN_LINK_REMOTE_DETACH:
+ return "PN_LINK_REMOTE_DETACH";
+ case PN_LINK_LOCAL_CLOSE:
+ return "PN_LINK_LOCAL_CLOSE";
+ case PN_LINK_FLOW:
+ return "PN_LINK_FLOW";
+ case PN_LINK_FINAL:
+ return "PN_LINK_FINAL";
+ case PN_DELIVERY:
+ return "PN_DELIVERY";
+ case PN_TRANSPORT:
+ return "PN_TRANSPORT";
+ case PN_TRANSPORT_AUTHENTICATED:
+ return "PN_TRANSPORT_AUTHENTICATED";
+ case PN_TRANSPORT_ERROR:
+ return "PN_TRANSPORT_ERROR";
+ case PN_TRANSPORT_HEAD_CLOSED:
+ return "PN_TRANSPORT_HEAD_CLOSED";
+ case PN_TRANSPORT_TAIL_CLOSED:
+ return "PN_TRANSPORT_TAIL_CLOSED";
+ case PN_TRANSPORT_CLOSED:
+ return "PN_TRANSPORT_CLOSED";
+ case PN_SELECTABLE_INIT:
+ return "PN_SELECTABLE_INIT";
+ case PN_SELECTABLE_UPDATED:
+ return "PN_SELECTABLE_UPDATED";
+ case PN_SELECTABLE_READABLE:
+ return "PN_SELECTABLE_READABLE";
+ case PN_SELECTABLE_WRITABLE:
+ return "PN_SELECTABLE_WRITABLE";
+ case PN_SELECTABLE_ERROR:
+ return "PN_SELECTABLE_ERROR";
+ case PN_SELECTABLE_EXPIRED:
+ return "PN_SELECTABLE_EXPIRED";
+ case PN_SELECTABLE_FINAL:
+ return "PN_SELECTABLE_FINAL";
+ case PN_CONNECTION_WAKE:
+ return "PN_CONNECTION_WAKE";
+ case PN_LISTENER_ACCEPT:
+ return "PN_LISTENER_ACCEPT";
+ case PN_LISTENER_CLOSE:
+ return "PN_LISTENER_CLOSE";
+ case PN_PROACTOR_INTERRUPT:
+ return "PN_PROACTOR_INTERRUPT";
+ case PN_PROACTOR_TIMEOUT:
+ return "PN_PROACTOR_TIMEOUT";
+ case PN_PROACTOR_INACTIVE:
+ return "PN_PROACTOR_INACTIVE";
+ case PN_LISTENER_OPEN:
+ return "PN_LISTENER_OPEN";
+ default:
+ return "PN_UNKNOWN";
+ }
+ return NULL;
+}
+
+pn_event_t *pn_event_batch_next(pn_event_batch_t *batch) {
+ return batch->next_event(batch);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/framing.c
----------------------------------------------------------------------
diff --git a/c/src/core/framing.c b/c/src/core/framing.c
new file mode 100644
index 0000000..9f78666
--- /dev/null
+++ b/c/src/core/framing.c
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include "framing.h"
+
+// TODO: These are near duplicates of code in codec.c - they should be
+// deduplicated.
+static inline void pn_i_write16(char *bytes, uint16_t value)
+{
+ bytes[0] = 0xFF & (value >> 8);
+ bytes[1] = 0xFF & (value );
+}
+
+
+static inline void pn_i_write32(char *bytes, uint32_t value)
+{
+ bytes[0] = 0xFF & (value >> 24);
+ bytes[1] = 0xFF & (value >> 16);
+ bytes[2] = 0xFF & (value >> 8);
+ bytes[3] = 0xFF & (value );
+}
+
+static inline uint16_t pn_i_read16(const char *bytes)
+{
+ uint16_t a = (uint8_t) bytes[0];
+ uint16_t b = (uint8_t) bytes[1];
+ uint16_t r = a << 8
+ | b;
+ return r;
+}
+
+static inline uint32_t pn_i_read32(const char *bytes)
+{
+ uint32_t a = (uint8_t) bytes[0];
+ uint32_t b = (uint8_t) bytes[1];
+ uint32_t c = (uint8_t) bytes[2];
+ uint32_t d = (uint8_t) bytes[3];
+ uint32_t r = a << 24
+ | b << 16
+ | c << 8
+ | d;
+ return r;
+}
+
+
+ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max)
+{
+ if (available < AMQP_HEADER_SIZE) return 0;
+ uint32_t size = pn_i_read32(&bytes[0]);
+ if (max && size > max) return PN_ERR;
+ if (available < size) return 0;
+ unsigned int doff = 4 * (uint8_t)bytes[4];
+ if (doff < AMQP_HEADER_SIZE || doff > size) return PN_ERR;
+
+ frame->size = size - doff;
+ frame->ex_size = doff - AMQP_HEADER_SIZE;
+ frame->type = bytes[5];
+ frame->channel = pn_i_read16(&bytes[6]);
+ frame->extended = bytes + AMQP_HEADER_SIZE;
+ frame->payload = bytes + doff;
+
+ return size;
+}
+
+size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame)
+{
+ size_t size = AMQP_HEADER_SIZE + frame.ex_size + frame.size;
+ if (size <= pn_buffer_available(buffer))
+ {
+ // Prepare header
+ char bytes[8];
+ pn_i_write32(&bytes[0], size);
+ int doff = (frame.ex_size + AMQP_HEADER_SIZE - 1)/4 + 1;
+ bytes[4] = doff;
+ bytes[5] = frame.type;
+ pn_i_write16(&bytes[6], frame.channel);
+
+ // Write header then rest of frame
+ pn_buffer_append(buffer, bytes, 8);
+ if (frame.extended)
+ pn_buffer_append(buffer, frame.extended, frame.ex_size);
+ pn_buffer_append(buffer, frame.payload, frame.size);
+ return size;
+ } else {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/framing.h
----------------------------------------------------------------------
diff --git a/c/src/core/framing.h b/c/src/core/framing.h
new file mode 100644
index 0000000..792d664
--- /dev/null
+++ b/c/src/core/framing.h
@@ -0,0 +1,46 @@
+#ifndef PROTON_FRAMING_H
+#define PROTON_FRAMING_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "buffer.h"
+
+#include <proton/import_export.h>
+#include <proton/type_compat.h>
+#include <proton/error.h>
+
+#define AMQP_HEADER_SIZE (8)
+#define AMQP_MIN_MAX_FRAME_SIZE ((uint32_t)512) // minimum allowable max-frame
+
+typedef struct {
+ uint8_t type;
+ uint16_t channel;
+ size_t ex_size;
+ const char *extended;
+ size_t size;
+ const char *payload;
+} pn_frame_t;
+
+ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max);
+size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame);
+
+#endif /* framing.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/log.c
----------------------------------------------------------------------
diff --git a/c/src/core/log.c b/c/src/core/log.c
new file mode 100644
index 0000000..754eed3
--- /dev/null
+++ b/c/src/core/log.c
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/log.h>
+#include <proton/object.h>
+#include <stdio.h>
+#include "log_private.h"
+#include "util.h"
+
+
+static void stderr_logger(const char *message) {
+ fprintf(stderr, "%s\n", message);
+}
+
+static pn_logger_t logger = stderr_logger;
+static int enabled_env = -1; /* Set from environment variable. */
+static int enabled_call = -1; /* set by pn_log_enable */
+
+void pn_log_enable(bool value) {
+ enabled_call = value;
+}
+
+bool pni_log_enabled(void) {
+ if (enabled_call != -1) return enabled_call; /* Takes precedence */
+ if (enabled_env == -1)
+ enabled_env = pn_env_bool("PN_TRACE_LOG");
+ return enabled_env;
+}
+
+void pn_log_logger(pn_logger_t new_logger) {
+ logger = new_logger;
+ if (!logger) pn_log_enable(false);
+}
+
+void pni_vlogf_impl(const char *fmt, va_list ap) {
+ vfprintf(stderr, fmt, ap);
+ fprintf(stderr, "\n");
+}
+
+/**@internal
+ *
+ * Note: We check pni_log_enabled() in the pn_logf macro *before* calling
+ * pni_logf_impl because evaluating the arguments to that call could have
+ * side-effects with performance impact (e.g. calling functions to construct
+ * complicated messages.) It is important that a disabled log statement results
+ * in nothing more than a call to pni_log_enabled().
+ */
+void pni_logf_impl(const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
+ pni_vlogf_impl(fmt, ap);
+ va_end(ap);
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/log_private.h
----------------------------------------------------------------------
diff --git a/c/src/core/log_private.h b/c/src/core/log_private.h
new file mode 100644
index 0000000..75044ed
--- /dev/null
+++ b/c/src/core/log_private.h
@@ -0,0 +1,54 @@
+#ifndef LOG_PRIVATE_H
+#define LOG_PRIVATE_H
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**@file
+ *
+ * Log messages that are not associated with a transport.
+ */
+
+#include <proton/log.h>
+#include <stdarg.h>
+
+/** Log a printf style message */
+#define pn_logf(...) \
+ do { \
+ if (pni_log_enabled()) \
+ pni_logf_impl(__VA_ARGS__); \
+ } while(0)
+
+/** va_list version of pn_logf */
+#define pn_vlogf(fmt, ap) \
+ do { \
+ if (pni_log_enabled()) \
+ pni_vlogf_impl(fmt, ap); \
+ } while(0)
+
+/** Return true if logging is enabled. */
+PN_EXTERN bool pni_log_enabled(void);
+
+/**@internal*/
+PN_EXTERN void pni_logf_impl(const char* fmt, ...);
+/**@internal*/
+PN_EXTERN void pni_vlogf_impl(const char *fmt, va_list ap);
+
+
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/max_align.h
----------------------------------------------------------------------
diff --git a/c/src/core/max_align.h b/c/src/core/max_align.h
new file mode 100644
index 0000000..ff9a6e5
--- /dev/null
+++ b/c/src/core/max_align.h
@@ -0,0 +1,47 @@
+#ifndef MAX_ALIGN_H
+#define MAX_ALIGN_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/type_compat.h>
+#include <stddef.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#if __STDC_VERSION__ >= 201112
+/* Use standard max_align_t for alignment on c11 */
+typedef max_align_t pn_max_align_t;
+#else
+/* Align on a union of likely largest types for older compilers */
+typedef union pn_max_align_t {
+ uint64_t i;
+ long double d;
+ void *v;
+ void (*fp)(void);
+} pn_max_align_t;
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // MAX_ALIGN_H
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org