You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/03/22 15:45:39 UTC

svn commit: r1303812 - in /qpid/proton/trunk/proton-c: CMakeLists.txt cproton.i include/proton/engine.h include/proton/sasl.h include/proton/value.h proton.i src/driver.c src/engine/engine-internal.h src/engine/engine.c src/proton.c src/types/binary.c

Author: rhs
Date: Thu Mar 22 14:45:38 2012
New Revision: 1303812

URL: http://svn.apache.org/viewvc?rev=1303812&view=rev
Log:
API updates based on swig experience

Added:
    qpid/proton/trunk/proton-c/cproton.i
      - copied, changed from r1303285, qpid/proton/trunk/proton-c/proton.i
Removed:
    qpid/proton/trunk/proton-c/proton.i
Modified:
    qpid/proton/trunk/proton-c/CMakeLists.txt
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/include/proton/sasl.h
    qpid/proton/trunk/proton-c/include/proton/value.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/proton.c
    qpid/proton/trunk/proton-c/src/types/binary.c

Modified: qpid/proton/trunk/proton-c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/CMakeLists.txt?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/CMakeLists.txt Thu Mar 22 14:45:38 2012
@@ -61,8 +61,8 @@ find_package (PythonLibs)
 include_directories (${PYTHON_INCLUDE_PATH})
 include_directories (${PROJECT_SOURCE_DIR})
 
-swig_add_module(proton python proton.i)
-swig_link_libraries(proton qpidproton ${PYTHON_LIBRARIES})
+swig_add_module(cproton python cproton.i)
+swig_link_libraries(cproton qpidproton ${PYTHON_LIBRARIES})
 
 add_executable (proton src/proton.c)
 target_link_libraries (proton qpidproton)

Copied: qpid/proton/trunk/proton-c/cproton.i (from r1303285, qpid/proton/trunk/proton-c/proton.i)
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/cproton.i?p2=qpid/proton/trunk/proton-c/cproton.i&p1=qpid/proton/trunk/proton-c/proton.i&r1=1303285&r2=1303812&rev=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/proton.i (original)
+++ qpid/proton/trunk/proton-c/cproton.i Thu Mar 22 14:45:38 2012
@@ -1,8 +1,62 @@
-%module proton
+%module cproton
 %{
 /* Includes the header in the wrapper code */
 #include <proton/engine.h>
 %}
 
+typedef unsigned int size_t;
+typedef signed int ssize_t;
+
+%include <cwstring.i>
+%include <cstring.i>
+
+%cstring_output_withsize(char *OUTPUT, size_t *OUTPUT_SIZE)
+
+int wrap_pn_output(pn_transport_t *transport, char *OUTPUT, size_t *OUTPUT_SIZE);
+
+%rename(pn_output) wrap_pn_output;
+
+%inline %{
+  int wrap_pn_output(pn_transport_t *transport, char *OUTPUT, size_t *OUTPUT_SIZE) {
+    ssize_t sz = pn_output(transport, OUTPUT, *OUTPUT_SIZE);
+    if (sz >= 0) {
+      *OUTPUT_SIZE = sz;
+      return 0;
+    } else {
+      *OUTPUT_SIZE = 0;
+      return sz;
+    }
+  }
+%}
+
+%ignore pn_output;
+
+pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH);
+
+%rename(pn_delivery) wrap_pn_delivery;
+
+%inline %{
+  pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH) {
+    return pn_delivery(link, pn_dtag(STRING, LENGTH));
+  }
+%}
+
+%ignore pn_delivery;
+
+%cstring_output_allocate_size(char **ALLOC_OUTPUT, size_t *ALLOC_SIZE, free(*$1));
+
+%rename(pn_delivery_tag) wrap_pn_delivery_tag;
+
+%inline %{
+  void wrap_pn_delivery_tag(pn_delivery_t *delivery, char **ALLOC_OUTPUT, size_t *ALLOC_SIZE) {
+    pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+    *ALLOC_OUTPUT = malloc(tag.size);
+    *ALLOC_SIZE = tag.size;
+    memcpy(*ALLOC_OUTPUT, tag.bytes, tag.size);
+  }
+%}
+
+%ignore pn_delivery_tag;
+
 /* Parse the header file to generate wrappers */
 %include "proton/engine.h"

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Thu Mar 22 14:45:38 2012
@@ -25,93 +25,119 @@
 #include <stdbool.h>
 #include <stddef.h>
 #include <sys/types.h>
-#include <proton/value.h>
 
 typedef struct pn_error_t pn_error_t;
-typedef struct pn_endpoint_t pn_endpoint_t;
 typedef struct pn_transport_t pn_transport_t;
 typedef struct pn_connection_t pn_connection_t;
 typedef struct pn_session_t pn_session_t;
 typedef struct pn_link_t pn_link_t;
-typedef struct pn_sender_t pn_sender_t;
-typedef struct pn_receiver_t pn_receiver_t;
 typedef struct pn_delivery_t pn_delivery_t;
 
-typedef enum pn_endpoint_state_t {UNINIT=1, ACTIVE=2, CLOSED=4} pn_endpoint_state_t;
-typedef enum pn_endpoint_type_t {CONNECTION=1, TRANSPORT=2, SESSION=3, SENDER=4, RECEIVER=5} pn_endpoint_type_t;
-typedef enum pn_disposition_t {PN_RECEIVED=1, PN_ACCEPTED=2, PN_REJECTED=3, PN_RELEASED=4, PN_MODIFIED=5} pn_disposition_t;
+typedef struct pn_delivery_tag_t {
+  size_t size;
+  const char *bytes;
+} pn_delivery_tag_t;
+
+#define pn_dtag(BYTES, SIZE) ((pn_delivery_tag_t) {(SIZE), (BYTES)})
+
+typedef enum pn_state_t {
+  PN_LOCAL_UNINIT=1,
+  PN_LOCAL_ACTIVE=2,
+  PN_LOCAL_CLOSED=4,
+  PN_REMOTE_UNINIT=8,
+  PN_REMOTE_ACTIVE=16,
+  PN_REMOTE_CLOSED=32
+} pn_state_t;
+
+typedef enum pn_disposition_t {
+  PN_RECEIVED=1,
+  PN_ACCEPTED=2,
+  PN_REJECTED=3,
+  PN_RELEASED=4,
+  PN_MODIFIED=5
+} pn_disposition_t;
 
 typedef enum pn_trace_t {PN_TRACE_OFF=0, PN_TRACE_RAW=1, PN_TRACE_FRM=2} pn_trace_t;
 
-/* Currently the way inheritence is done it is safe to "upcast" from
-   pn_{transport,connection,session,link,sender,or receiver}_t to
-   pn_endpoint_t and to "downcast" based on the endpoint type. I'm
-   not sure if this should be part of the ABI or not. */
-
-// endpoint
-pn_endpoint_type_t pn_endpoint_type(pn_endpoint_t *endpoint);
-pn_endpoint_state_t pn_local_state(pn_endpoint_t *endpoint);
-pn_endpoint_state_t pn_remote_state(pn_endpoint_t *endpoint);
-pn_error_t *pn_local_error(pn_endpoint_t *endpoint);
-pn_error_t *pn_remote_error(pn_endpoint_t *endpoint);
-void pn_destroy(pn_endpoint_t *endpoint);
-void pn_open(pn_endpoint_t *endpoint);
-void pn_close(pn_endpoint_t *endpoint);
-
 // connection
 pn_connection_t *pn_connection();
+
+pn_state_t pn_connection_state(pn_connection_t *connection);
+pn_error_t *pn_connection_error(pn_connection_t *connection);
 void pn_connection_set_container(pn_connection_t *connection, const wchar_t *container);
 void pn_connection_set_hostname(pn_connection_t *connection, const wchar_t *hostname);
+
 pn_delivery_t *pn_work_head(pn_connection_t *connection);
 pn_delivery_t *pn_work_next(pn_delivery_t *delivery);
 
 pn_session_t *pn_session(pn_connection_t *connection);
 pn_transport_t *pn_transport(pn_connection_t *connection);
 
-pn_endpoint_t *pn_endpoint_head(pn_connection_t *connection,
-                                pn_endpoint_state_t local,
-                                pn_endpoint_state_t remote);
-pn_endpoint_t *pn_endpoint_next(pn_endpoint_t *endpoint,
-                                pn_endpoint_state_t local,
-                                pn_endpoint_state_t remote);
+pn_session_t *pn_session_head(pn_connection_t *connection, pn_state_t state);
+pn_session_t *pn_session_next(pn_session_t *session, pn_state_t state);
+
+pn_link_t *pn_link_head(pn_connection_t *connection, pn_state_t state);
+pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state);
+
+void pn_connection_open(pn_connection_t *connection);
+void pn_connection_close(pn_connection_t *connection);
+void pn_connection_destroy(pn_connection_t *connection);
 
 // transport
 #define PN_EOS (-1)
 #define PN_ERR (-2)
+pn_state_t pn_transport_state(pn_transport_t *transport);
+pn_error_t *pn_transport_error(pn_transport_t *transport);
 ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available);
 ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size);
 time_t pn_tick(pn_transport_t *transport, time_t now);
 void pn_trace(pn_transport_t *transport, pn_trace_t trace);
+void pn_transport_open(pn_transport_t *transport);
+void pn_transport_close(pn_transport_t *transport);
+void pn_transport_destroy(pn_transport_t *transport);
 
 // session
-pn_sender_t *pn_sender(pn_session_t *session, const wchar_t *name);
-pn_receiver_t *pn_receiver(pn_session_t *session, const wchar_t *name);
+pn_state_t pn_session_state(pn_session_t *session);
+pn_error_t *pn_session_error(pn_session_t *session);
+pn_link_t *pn_sender(pn_session_t *session, const wchar_t *name);
+pn_link_t *pn_receiver(pn_session_t *session, const wchar_t *name);
+void pn_session_open(pn_session_t *session);
+void pn_session_close(pn_session_t *session);
+void pn_session_destroy(pn_session_t *session);
 
 // link
+bool pn_is_sender(pn_link_t *link);
+bool pn_is_receiver(pn_link_t *link);
+pn_state_t pn_link_state(pn_link_t *link);
+pn_error_t *pn_link_error(pn_link_t *link);
 pn_session_t *pn_get_session(pn_link_t *link);
 void pn_set_source(pn_link_t *link, const wchar_t *source);
 void pn_set_target(pn_link_t *link, const wchar_t *target);
 wchar_t *pn_remote_source(pn_link_t *link);
 wchar_t *pn_remote_target(pn_link_t *link);
-pn_delivery_t *pn_delivery(pn_link_t *link, pn_binary_t *tag);
+pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag);
 pn_delivery_t *pn_current(pn_link_t *link);
 bool pn_advance(pn_link_t *link);
 
 pn_delivery_t *pn_unsettled_head(pn_link_t *link);
 pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery);
 
+void pn_link_open(pn_link_t *sender);
+void pn_link_close(pn_link_t *sender);
+void pn_link_destroy(pn_link_t *sender);
+
 // sender
 //void pn_offer(pn_sender_t *sender, int credits);
-ssize_t pn_send(pn_sender_t *sender, const char *bytes, size_t n);
+ssize_t pn_send(pn_link_t *sender, const char *bytes, size_t n);
 //void pn_abort(pn_sender_t *sender);
 
 // receiver
 #define PN_EOM (-1)
-void pn_flow(pn_receiver_t *receiver, int credits);
-ssize_t pn_recv(pn_receiver_t *receiver, char *bytes, size_t n);
+void pn_flow(pn_link_t *receiver, int credits);
+ssize_t pn_recv(pn_link_t *receiver, char *bytes, size_t n);
 
 // delivery
-pn_binary_t *pn_delivery_tag(pn_delivery_t *delivery);
+pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery);
 pn_link_t *pn_link(pn_delivery_t *delivery);
 // how do we do delivery state?
 int pn_local_disp(pn_delivery_t *delivery);

Modified: qpid/proton/trunk/proton-c/include/proton/sasl.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/sasl.h?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/sasl.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/sasl.h Thu Mar 22 14:45:38 2012
@@ -24,6 +24,7 @@
 
 #include <sys/types.h>
 #include <stdbool.h>
+#include <proton/value.h>
 
 typedef struct pn_sasl_t pn_sasl_t;
 

Modified: qpid/proton/trunk/proton-c/include/proton/value.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/value.h?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/value.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/value.h Thu Mar 22 14:45:38 2012
@@ -189,7 +189,7 @@ wchar_t *pn_string_wcs(pn_string_t *str)
 
 /* binary */
 
-pn_binary_t *pn_binary(char *bytes, size_t size);
+pn_binary_t *pn_binary(const char *bytes, size_t size);
 size_t pn_binary_size(pn_binary_t *b);
 const char *pn_binary_bytes(pn_binary_t *b);
 pn_binary_t *pn_binary_dup(pn_binary_t *b);

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Thu Mar 22 14:45:38 2012
@@ -153,9 +153,11 @@ void *pn_selectable_context(pn_selectabl
 
 void pn_selectable_destroy(pn_selectable_t *sel)
 {
+  if (!sel) return;
+
   if (sel->driver) pn_driver_remove(sel->driver, sel);
-  if (sel->connection) pn_destroy((pn_endpoint_t *) sel->connection);
-  if (sel->sasl) pn_sasl_destroy(sel->sasl);
+  pn_connection_destroy(sel->connection);
+  pn_sasl_destroy(sel->sasl);
   free(sel);
 }
 
@@ -322,7 +324,7 @@ static ssize_t pn_selectable_write_amqp_
   fprintf(stderr, "    -> AMQP 1.0\n");
   memmove(pn_selectable_output(sel), "AMQP\x00\x01\x00\x00", 8);
   sel->process_output = pn_selectable_write_amqp;
-  pn_open((pn_endpoint_t *) sel->transport);
+  pn_transport_open(sel->transport);
   return 8;
 }
 

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Mar 22 14:45:38 2012
@@ -35,10 +35,14 @@ struct pn_error_t {
   pn_map_t *info;
 };
 
+typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER, TRANSPORT} pn_endpoint_type_t;
+
+typedef struct pn_endpoint_t pn_endpoint_t;
+
 struct pn_endpoint_t {
   pn_endpoint_type_t type;
-  pn_endpoint_state_t local_state, remote_state;
-  pn_error_t local_error, remote_error;
+  pn_state_t state;
+  pn_error_t error;
   pn_endpoint_t *endpoint_next;
   pn_endpoint_t *endpoint_prev;
   pn_endpoint_t *transport_next;
@@ -140,17 +144,10 @@ struct pn_link_t {
   pn_delivery_t *current;
   pn_delivery_t *settled_head;
   pn_delivery_t *settled_tail;
+  // XXX
   pn_sequence_t credit;
-  size_t id;
-};
-
-struct pn_sender_t {
-  pn_link_t link;
-};
-
-struct pn_receiver_t {
-  pn_link_t link;
   pn_sequence_t credits;
+  size_t id;
 };
 
 struct pn_delivery_t {
@@ -175,11 +172,11 @@ struct pn_delivery_t {
   void *context;
 };
 
-void pn_destroy_connection(pn_connection_t *connection);
-void pn_destroy_transport(pn_transport_t *transport);
-void pn_destroy_session(pn_session_t *session);
-void pn_destroy_sender(pn_sender_t *sender);
-void pn_destroy_receiver(pn_receiver_t *receiver);
+#define PN_SET_LOCAL(OLD, NEW)                                          \
+  (OLD) = ((OLD) & (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED)) | (NEW)
+
+#define PN_SET_REMOTE(OLD, NEW)                                         \
+  (OLD) = ((OLD) & (PN_LOCAL_UNINIT | PN_LOCAL_ACTIVE | PN_LOCAL_CLOSED)) | (NEW)
 
 void pn_link_dump(pn_link_t *link);
 

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Mar 22 14:45:38 2012
@@ -135,35 +135,37 @@ void pn_delivery_buffer_gc(pn_delivery_b
 
 // endpoints
 
-pn_endpoint_type_t pn_endpoint_type(pn_endpoint_t *endpoint)
+pn_connection_t *pn_get_connection(pn_endpoint_t *endpoint)
 {
-  return endpoint->type;
-}
+  switch (endpoint->type) {
+  case CONNECTION:
+    return (pn_connection_t *) endpoint;
+  case SESSION:
+    return ((pn_session_t *) endpoint)->connection;
+  case SENDER:
+  case RECEIVER:
+    return ((pn_link_t *) endpoint)->session->connection;
+  case TRANSPORT:
+    return ((pn_transport_t *) endpoint)->connection;
+  }
 
-pn_endpoint_state_t pn_local_state(pn_endpoint_t *endpoint)
-{
-  return endpoint->local_state;
+  return NULL;
 }
 
-pn_endpoint_state_t pn_remote_state(pn_endpoint_t *endpoint)
-{
-  return endpoint->remote_state;
-}
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
 
-pn_error_t *pn_local_error(pn_endpoint_t *endpoint)
+void pn_open(pn_endpoint_t *endpoint)
 {
-  if (endpoint->local_error.condition)
-    return &endpoint->local_error;
-  else
-    return NULL;
+  // TODO: do we care about the current state?
+  PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE);
+  pn_modified(pn_get_connection(endpoint), endpoint);
 }
 
-pn_error_t *pn_remote_error(pn_endpoint_t *endpoint)
+void pn_close(pn_endpoint_t *endpoint)
 {
-  if (endpoint->remote_error.condition)
-    return &endpoint->remote_error;
-  else
-    return NULL;
+  // TODO: do we care about the current state?
+  PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED);
+  pn_modified(pn_get_connection(endpoint), endpoint);
 }
 
 void pn_destroy(pn_endpoint_t *endpoint)
@@ -171,36 +173,60 @@ void pn_destroy(pn_endpoint_t *endpoint)
   switch (endpoint->type)
   {
   case CONNECTION:
-    pn_destroy_connection((pn_connection_t *)endpoint);
+    pn_connection_destroy((pn_connection_t *)endpoint);
     break;
   case TRANSPORT:
-    pn_destroy_transport((pn_transport_t *)endpoint);
+    pn_transport_destroy((pn_transport_t *)endpoint);
     break;
   case SESSION:
-    pn_destroy_session((pn_session_t *)endpoint);
+    pn_session_destroy((pn_session_t *)endpoint);
     break;
   case SENDER:
-    pn_destroy_sender((pn_sender_t *)endpoint);
+    pn_link_destroy((pn_link_t *)endpoint);
     break;
   case RECEIVER:
-    pn_destroy_receiver((pn_receiver_t *)endpoint);
+    pn_link_destroy((pn_link_t *)endpoint);
     break;
   }
 }
 
-void pn_destroy_connection(pn_connection_t *connection)
+void pn_connection_open(pn_connection_t *connection)
 {
-  pn_destroy_transport(connection->transport);
+  pn_open((pn_endpoint_t *) connection);
+}
+
+void pn_connection_close(pn_connection_t *connection)
+{
+  pn_close((pn_endpoint_t *) connection);
+}
+
+void pn_connection_destroy(pn_connection_t *connection)
+{
+  if (!connection) return;
+
+  pn_transport_destroy(connection->transport);
   while (connection->session_count)
-    pn_destroy_session(connection->sessions[connection->session_count - 1]);
+    pn_session_destroy(connection->sessions[connection->session_count - 1]);
   free(connection->sessions);
   free(connection->container);
   free(connection->hostname);
   free(connection);
 }
 
-void pn_destroy_transport(pn_transport_t *transport)
+void pn_transport_open(pn_transport_t *transport)
 {
+  pn_open((pn_endpoint_t *) transport);
+}
+
+void pn_transport_close(pn_transport_t *transport)
+{
+  pn_close((pn_endpoint_t *) transport);
+}
+
+void pn_transport_destroy(pn_transport_t *transport)
+{
+  if (!transport) return;
+
   pn_dispatcher_destroy(transport->disp);
   for (int i = 0; i < transport->session_capacity; i++) {
     pn_delivery_buffer_destroy(&transport->sessions[i].incoming);
@@ -235,10 +261,22 @@ void pn_remove_session(pn_connection_t *
   ssn->connection = NULL;
 }
 
-void pn_destroy_session(pn_session_t *session)
+void pn_session_open(pn_session_t *session)
+{
+  pn_open((pn_endpoint_t *) session);
+}
+
+void pn_session_close(pn_session_t *session)
 {
+  pn_close((pn_endpoint_t *) session);
+}
+
+void pn_session_destroy(pn_session_t *session)
+{
+  if (!session) return;
+
   while (session->link_count)
-    pn_destroy(&session->links[session->link_count - 1]->endpoint);
+    pn_link_destroy(session->links[session->link_count - 1]);
   pn_remove_session(session->connection, session);
   free(session->links);
   free(session);
@@ -318,36 +356,36 @@ void pn_link_dump(pn_link_t *link)
   printf("\n");
 }
 
-void pn_link_uninit(pn_link_t *link)
+void pn_link_open(pn_link_t *link)
 {
-  if (link->local_source) free(link->local_source);
-  if (link->local_target) free(link->local_target);
-  if (link->remote_source) free(link->remote_source);
-  if (link->remote_target) free(link->remote_target);
-  pn_remove_link(link->session, link);
-  pn_free_deliveries(link->settled_head);
-  pn_free_deliveries(link->head);
-  free(link->name);
+  pn_open((pn_endpoint_t *) link);
 }
 
-void pn_destroy_sender(pn_sender_t *sender)
+void pn_link_close(pn_link_t *link)
 {
-  pn_link_uninit(&sender->link);
-  free(sender);
+  pn_close((pn_endpoint_t *) link);
 }
-void pn_destroy_receiver(pn_receiver_t *receiver)
+
+void pn_link_destroy(pn_link_t *link)
 {
-  pn_link_uninit(&receiver->link);
-  free(receiver);
+  if (!link) return;
+
+  free(link->local_source);
+  free(link->local_target);
+  free(link->remote_source);
+  free(link->remote_target);
+  pn_remove_link(link->session, link);
+  pn_free_deliveries(link->settled_head);
+  pn_free_deliveries(link->head);
+  free(link->name);
+  free(link);
 }
 
 void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
 {
   endpoint->type = type;
-  endpoint->local_state = UNINIT;
-  endpoint->remote_state = UNINIT;
-  endpoint->local_error = (pn_error_t) {.condition = NULL};
-  endpoint->remote_error = (pn_error_t) {.condition = NULL};
+  endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
+  endpoint->error = (pn_error_t) {.condition = NULL};
   endpoint->endpoint_next = NULL;
   endpoint->endpoint_prev = NULL;
   endpoint->transport_next = NULL;
@@ -357,39 +395,6 @@ void pn_endpoint_init(pn_endpoint_t *end
   LL_ADD_PFX(conn->endpoint_head, conn->endpoint_tail, endpoint, endpoint_);
 }
 
-pn_connection_t *pn_get_connection(pn_endpoint_t *endpoint)
-{
-  switch (endpoint->type) {
-  case CONNECTION:
-    return (pn_connection_t *) endpoint;
-  case SESSION:
-    return ((pn_session_t *) endpoint)->connection;
-  case SENDER:
-  case RECEIVER:
-    return ((pn_link_t *) endpoint)->session->connection;
-  case TRANSPORT:
-    return ((pn_transport_t *) endpoint)->connection;
-  }
-
-  return NULL;
-}
-
-void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
-
-void pn_open(pn_endpoint_t *endpoint)
-{
-  // TODO: do we care about the current state?
-  endpoint->local_state = ACTIVE;
-  pn_modified(pn_get_connection(endpoint), endpoint);
-}
-
-void pn_close(pn_endpoint_t *endpoint)
-{
-  // TODO: do we care about the current state?
-  endpoint->local_state = CLOSED;
-  pn_modified(pn_get_connection(endpoint), endpoint);
-}
-
 pn_connection_t *pn_connection()
 {
   pn_connection_t *conn = malloc(sizeof(pn_connection_t));
@@ -414,6 +419,16 @@ pn_connection_t *pn_connection()
   return conn;
 }
 
+pn_state_t pn_connection_state(pn_connection_t *connection)
+{
+  return connection->endpoint.state;
+}
+
+pn_error_t *pn_connection_error(pn_connection_t *connection)
+{
+  return &connection->endpoint.error;
+}
+
 void pn_connection_set_container(pn_connection_t *connection, const wchar_t *container)
 {
   if (connection->container) free(connection->container);
@@ -530,36 +545,58 @@ void pn_clear_modified(pn_connection_t *
   }
 }
 
-bool pn_matches(pn_endpoint_t *endpoint, pn_endpoint_state_t local,
-                pn_endpoint_state_t remote)
+bool pn_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
 {
-  return (endpoint->local_state & local) && (endpoint->remote_state & remote);
+  return (endpoint->type == type) && (endpoint->state == state);
 }
 
-pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_state_t local,
-                       pn_endpoint_state_t remote)
+pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
 {
   while (endpoint)
   {
-    if (pn_matches(endpoint, local, remote))
+    if (pn_matches(endpoint, type, state))
       return endpoint;
     endpoint = endpoint->endpoint_next;
   }
   return NULL;
 }
 
-pn_endpoint_t *pn_endpoint_head(pn_connection_t *conn,
-                                pn_endpoint_state_t local,
-                                pn_endpoint_state_t remote)
+pn_session_t *pn_session_head(pn_connection_t *conn, pn_state_t state)
 {
-  return pn_find(conn->endpoint_head, local, remote);
+  return (pn_session_t *) pn_find(conn->endpoint_head, SESSION, state);
 }
 
-pn_endpoint_t *pn_endpoint_next(pn_endpoint_t *endpoint,
-                                pn_endpoint_state_t local,
-                                pn_endpoint_state_t remote)
+pn_session_t *pn_session_next(pn_session_t *ssn, pn_state_t state)
 {
-  return pn_find(endpoint->endpoint_next, local, remote);
+  return (pn_session_t *) pn_find(ssn->endpoint.endpoint_next, SESSION, state);
+}
+
+pn_link_t *pn_link_head(pn_connection_t *conn, pn_state_t state)
+{
+  pn_endpoint_t *endpoint = conn->endpoint_head;
+
+  while (endpoint)
+  {
+    if (pn_matches(endpoint, SENDER, state) || pn_matches(endpoint, RECEIVER, state))
+      return (pn_link_t *) endpoint;
+    endpoint = endpoint->endpoint_next;
+  }
+
+  return NULL;
+}
+
+pn_link_t *pn_link_next(pn_link_t *ssn, pn_state_t state)
+{
+  pn_endpoint_t *endpoint = ssn->endpoint.endpoint_next;
+
+  while (endpoint)
+  {
+    if (pn_matches(endpoint, SENDER, state) || pn_matches(endpoint, RECEIVER, state))
+      return (pn_link_t *) endpoint;
+    endpoint = endpoint->endpoint_next;
+  }
+
+  return NULL;
 }
 
 pn_session_t *pn_session(pn_connection_t *conn)
@@ -576,6 +613,16 @@ pn_session_t *pn_session(pn_connection_t
   return ssn;
 }
 
+pn_state_t pn_session_state(pn_session_t *session)
+{
+  return session->endpoint.state;
+}
+
+pn_error_t *pn_session_error(pn_session_t *session)
+{
+  return &session->endpoint.error;
+}
+
 void pn_do_open(pn_dispatcher_t *disp);
 void pn_do_begin(pn_dispatcher_t *disp);
 void pn_do_attach(pn_dispatcher_t *disp);
@@ -612,7 +659,7 @@ void pn_transport_init(pn_transport_t *t
   transport->channel_capacity = 0;
 }
 
-pn_session_state_t *pn_session_state(pn_transport_t *transport, pn_session_t *ssn)
+pn_session_state_t *pn_session_get_state(pn_transport_t *transport, pn_session_t *ssn)
 {
   int old_capacity = transport->session_capacity;
   PN_ENSURE(transport->sessions, transport->session_capacity, ssn->id + 1);
@@ -658,6 +705,16 @@ pn_transport_t *pn_transport(pn_connecti
   }
 }
 
+pn_state_t pn_transport_state(pn_transport_t *transport)
+{
+  return transport->endpoint.state;
+}
+
+pn_error_t *pn_transport_error(pn_transport_t *transport)
+{
+  return &transport->endpoint.error;
+}
+
 void pn_link_init(pn_link_t *link, int type, pn_session_t *session, const wchar_t *name)
 {
   pn_endpoint_init(&link->endpoint, type, session->connection);
@@ -670,6 +727,7 @@ void pn_link_init(pn_link_t *link, int t
   link->settled_head = link->settled_tail = NULL;
   link->head = link->tail = link->current = NULL;
   link->credit = 0;
+  link->credits = 0;
 }
 
 void pn_set_source(pn_link_t *link, const wchar_t *source)
@@ -694,7 +752,7 @@ wchar_t *pn_remote_target(pn_link_t *lin
   return link->remote_target;
 }
 
-pn_link_state_t *pn_link_state(pn_session_state_t *ssn_state, pn_link_t *link)
+pn_link_state_t *pn_link_get_state(pn_session_state_t *ssn_state, pn_link_t *link)
 {
   int old_capacity = ssn_state->link_capacity;
   PN_ENSURE(ssn_state->links, ssn_state->link_capacity, link->id + 1);
@@ -721,36 +779,55 @@ pn_link_state_t *pn_handle_state(pn_sess
   return ssn_state->handles[handle];
 }
 
-pn_sender_t *pn_sender(pn_session_t *session, const wchar_t *name)
+pn_link_t *pn_sender(pn_session_t *session, const wchar_t *name)
 {
-  pn_sender_t *snd = malloc(sizeof(pn_sender_t));
+  pn_link_t *snd = malloc(sizeof(pn_link_t));
   if (!snd) return NULL;
-  pn_link_init(&snd->link, SENDER, session, name);
+  pn_link_init(snd, SENDER, session, name);
   return snd;
 }
 
-pn_receiver_t *pn_receiver(pn_session_t *session, const wchar_t *name)
+pn_link_t *pn_receiver(pn_session_t *session, const wchar_t *name)
 {
-  pn_receiver_t *rcv = malloc(sizeof(pn_receiver_t));
+  pn_link_t *rcv = malloc(sizeof(pn_link_t));
   if (!rcv) return NULL;
-  pn_link_init(&rcv->link, RECEIVER, session, name);
-  rcv->credits = 0;
+  pn_link_init(rcv, RECEIVER, session, name);
   return rcv;
 }
 
+pn_state_t pn_link_state(pn_link_t *link)
+{
+  return link->endpoint.state;
+}
+
+pn_error_t *pn_link_error(pn_link_t *link)
+{
+  return &link->endpoint.error;
+}
+
+bool pn_is_sender(pn_link_t *link)
+{
+  return link->endpoint.type == SENDER;
+}
+
+bool pn_is_receiver(pn_link_t *link)
+{
+  return link->endpoint.type == RECEIVER;
+}
+
 pn_session_t *pn_get_session(pn_link_t *link)
 {
   return link->session;
 }
 
-pn_delivery_t *pn_delivery(pn_link_t *link, pn_binary_t *tag)
+pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
 {
   pn_delivery_t *delivery = link->settled_head;
   LL_POP_PFX(link->settled_head, link->settled_tail, link_);
   if (!delivery) delivery = malloc(sizeof(pn_delivery_t));
   if (!delivery) return NULL;
   delivery->link = link;
-  delivery->tag = pn_binary_dup(tag);
+  delivery->tag = pn_binary(tag.bytes, tag.size);
   delivery->local_state = 0;
   delivery->remote_state = 0;
   delivery->local_settled = false;
@@ -804,9 +881,9 @@ void pn_delivery_dump(pn_delivery_t *d)
          pn_readable(d), d->work);
 }
 
-pn_binary_t *pn_delivery_tag(pn_delivery_t *delivery)
+pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
 {
-  return delivery->tag;
+  return pn_dtag(pn_binary_bytes(delivery->tag), pn_binary_size(delivery->tag));
 }
 
 pn_delivery_t *pn_current(pn_link_t *link)
@@ -814,9 +891,8 @@ pn_delivery_t *pn_current(pn_link_t *lin
   return link->current;
 }
 
-void pn_advance_sender(pn_sender_t *sender)
+void pn_advance_sender(pn_link_t *link)
 {
-  pn_link_t *link = &sender->link;
   if (link->credit > 0) {
     link->credit--;
     pn_add_tpwork(link->current);
@@ -824,9 +900,8 @@ void pn_advance_sender(pn_sender_t *send
   }
 }
 
-void pn_advance_receiver(pn_receiver_t *receiver)
+void pn_advance_receiver(pn_link_t *link)
 {
-  pn_link_t *link = &receiver->link;
   link->current = link->current->link_next;
 }
 
@@ -835,9 +910,9 @@ bool pn_advance(pn_link_t *link)
   if (link->current) {
     pn_delivery_t *prev = link->current;
     if (link->endpoint.type == SENDER) {
-      pn_advance_sender((pn_sender_t *)link);
+      pn_advance_sender(link);
     } else {
-      pn_advance_receiver((pn_receiver_t *)link);
+      pn_advance_receiver(link);
     }
     pn_delivery_t *next = link->current;
     pn_work_update(link->session->connection, prev);
@@ -876,13 +951,13 @@ void pn_settle(pn_delivery_t *delivery)
 void pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...)
 {
   va_list ap;
-  transport->endpoint.local_error.condition = condition;
+  transport->endpoint.error.condition = condition;
   va_start(ap, fmt);
   // XXX: result
-  vsnprintf(transport->endpoint.local_error.description, DESCRIPTION, fmt, ap);
+  vsnprintf(transport->endpoint.error.description, DESCRIPTION, fmt, ap);
   va_end(ap);
-  transport->endpoint.local_state = CLOSED;
-  fprintf(stderr, "ERROR %s %s\n", condition, transport->endpoint.local_error.description);
+  PN_SET_LOCAL(transport->endpoint.state, PN_LOCAL_CLOSED);
+  fprintf(stderr, "ERROR %s %s\n", condition, transport->endpoint.error.description);
   // XXX: need to write close frame if appropriate
 }
 
@@ -890,8 +965,7 @@ void pn_do_open(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = disp->context;
   pn_connection_t *conn = transport->connection;
-  // TODO: store the state
-  conn->endpoint.remote_state = ACTIVE;
+  PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
 }
 
 void pn_do_begin(pn_dispatcher_t *disp)
@@ -904,10 +978,10 @@ void pn_do_begin(pn_dispatcher_t *disp)
     state = &transport->sessions[pn_to_uint16(remote_channel)];
   } else {
     pn_session_t *ssn = pn_session(transport->connection);
-    state = pn_session_state(transport, ssn);
+    state = pn_session_get_state(transport, ssn);
   }
   pn_map_channel(transport, disp->channel, state);
-  state->session->endpoint.remote_state = ACTIVE;
+  PN_SET_REMOTE(state->session->endpoint.state, PN_REMOTE_ACTIVE);
 }
 
 pn_link_state_t *pn_find_link(pn_session_state_t *ssn_state, pn_string_t *name)
@@ -917,7 +991,7 @@ pn_link_state_t *pn_find_link(pn_session
     pn_link_t *link = ssn_state->session->links[i];
     if (!wcsncmp(pn_string_wcs(name), link->name, pn_string_size(name)))
     {
-      return pn_link_state(ssn_state, link);
+      return pn_link_get_state(ssn_state, link);
     }
   }
   return NULL;
@@ -939,11 +1013,11 @@ void pn_do_attach(pn_dispatcher_t *disp)
     } else {
       link = (pn_link_t *) pn_receiver(ssn_state->session, pn_string_wcs(name));
     }
-    link_state = pn_link_state(ssn_state, link);
+    link_state = pn_link_get_state(ssn_state, link);
   }
 
   pn_map_handle(ssn_state, handle, link_state);
-  link_state->link->endpoint.remote_state = ACTIVE;
+  PN_SET_REMOTE(link_state->link->endpoint.state, PN_REMOTE_ACTIVE);
   pn_value_t remote_source = pn_list_get(args, ATTACH_SOURCE);
   if (remote_source.type == TAG)
     remote_source = pn_tag_value(pn_to_tag(remote_source));
@@ -972,7 +1046,7 @@ void pn_do_transfer(pn_dispatcher_t *dis
   pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
   pn_link_t *link = link_state->link;
   pn_binary_t *tag = pn_to_binary(pn_list_get(args, TRANSFER_DELIVERY_TAG));
-  pn_delivery_t *delivery = pn_delivery(link, tag);
+  pn_delivery_t *delivery = pn_delivery(link, pn_dtag(pn_binary_bytes(tag), pn_binary_size(tag)));
   pn_delivery_state_t *state = pn_delivery_buffer_push(&ssn_state->incoming, delivery);
   delivery->context = state;
   // XXX: need to check that state is not null (i.e. we haven't hit the limit)
@@ -1080,7 +1154,7 @@ void pn_do_detach(pn_dispatcher_t *disp)
 
   if (closed)
   {
-    link->endpoint.remote_state = CLOSED;
+    PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED);
   } else {
     // TODO: implement
   }
@@ -1093,27 +1167,27 @@ void pn_do_end(pn_dispatcher_t *disp)
   pn_session_t *session = ssn_state->session;
 
   ssn_state->remote_channel = -1;
-  session->endpoint.remote_state = CLOSED;
+  PN_SET_REMOTE(session->endpoint.state, PN_REMOTE_CLOSED);
 }
 
 void pn_do_close(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = disp->context;
-  transport->connection->endpoint.remote_state = CLOSED;
-  transport->endpoint.remote_state = CLOSED;
+  PN_SET_REMOTE(transport->connection->endpoint.state, PN_REMOTE_CLOSED);
+  PN_SET_REMOTE(transport->endpoint.state, PN_REMOTE_CLOSED);
 }
 
 ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available)
 {
-  if (transport->endpoint.local_state == UNINIT) {
+  if (transport->endpoint.state & PN_LOCAL_UNINIT) {
     return 0;
   }
 
-  if (transport->endpoint.local_state == CLOSED) {
+  if (transport->endpoint.state & PN_LOCAL_CLOSED) {
     return PN_EOS;
   }
 
-  if (transport->endpoint.remote_state == CLOSED) {
+  if (transport->endpoint.state & PN_REMOTE_CLOSED) {
     pn_do_error(transport, "amqp:connection:framing-error", "data after close");
     return PN_ERR;
   }
@@ -1125,7 +1199,7 @@ void pn_process_conn_setup(pn_transport_
 {
   if (endpoint->type == CONNECTION)
   {
-    if (endpoint->local_state != UNINIT && !transport->open_sent)
+    if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent)
     {
       pn_connection_t *connection = (pn_connection_t *) endpoint;
       pn_init_frame(transport->disp);
@@ -1144,8 +1218,8 @@ void pn_process_ssn_setup(pn_transport_t
   if (endpoint->type == SESSION)
   {
     pn_session_t *ssn = (pn_session_t *) endpoint;
-    pn_session_state_t *state = pn_session_state(transport, ssn);
-    if (endpoint->local_state != UNINIT && state->local_channel == (uint16_t) -1)
+    pn_session_state_t *state = pn_session_get_state(transport, ssn);
+    if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1)
     {
       pn_init_frame(transport->disp);
       if ((int16_t) state->remote_channel >= 0)
@@ -1167,9 +1241,9 @@ void pn_process_link_setup(pn_transport_
   if (endpoint->type == SENDER || endpoint->type == RECEIVER)
   {
     pn_link_t *link = (pn_link_t *) endpoint;
-    pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
-    pn_link_state_t *state = pn_link_state(ssn_state, link);
-    if (endpoint->local_state != UNINIT && state->local_handle == (uint32_t) -1)
+    pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
+    pn_link_state_t *state = pn_link_get_state(ssn_state, link);
+    if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == (uint32_t) -1)
     {
       pn_init_frame(transport->disp);
       pn_field(transport->disp, ATTACH_ROLE, pn_boolean(endpoint->type == RECEIVER));
@@ -1192,12 +1266,12 @@ void pn_process_link_setup(pn_transport_
 
 void pn_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
-  if (endpoint->type == RECEIVER && endpoint->local_state == ACTIVE)
+  if (endpoint->type == RECEIVER && endpoint->state & PN_LOCAL_ACTIVE)
   {
-    pn_receiver_t *rcv = (pn_receiver_t *) endpoint;
+    pn_link_t *rcv = (pn_link_t *) endpoint;
     if (rcv->credits) {
-      pn_session_state_t *ssn_state = pn_session_state(transport, rcv->link.session);
-      pn_link_state_t *state = pn_link_state(ssn_state, &rcv->link);
+      pn_session_state_t *ssn_state = pn_session_get_state(transport, rcv->session);
+      pn_link_state_t *state = pn_link_get_state(ssn_state, rcv);
       state->link_credit += rcv->credits;
       rcv->credits = 0;
 
@@ -1217,7 +1291,7 @@ void pn_process_flow_receiver(pn_transpo
 void pn_post_disp(pn_transport_t *transport, pn_delivery_t *delivery)
 {
   pn_link_t *link = delivery->link;
-  pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+  pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
   // XXX: check for null state
   pn_delivery_state_t *state = delivery->context;
   pn_init_frame(transport->disp);
@@ -1255,7 +1329,7 @@ void pn_process_disp_receiver(pn_transpo
       pn_link_t *link = delivery->link;
       if (link->endpoint.type == RECEIVER) {
         // XXX: need to prevent duplicate disposition sending
-        pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+        pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
         if ((int16_t) ssn_state->local_channel >= 0) {
           pn_post_disp(transport, delivery);
         }
@@ -1279,8 +1353,8 @@ void pn_process_msg_data(pn_transport_t 
     {
       pn_link_t *link = delivery->link;
       if (link->endpoint.type == SENDER) {
-        pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
-        pn_link_state_t *link_state = pn_link_state(ssn_state, link);
+        pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
+        pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
         pn_delivery_state_t *state = delivery->context;
         if (!state) {
           state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
@@ -1316,7 +1390,7 @@ void pn_process_disp_sender(pn_transport
       pn_link_t *link = delivery->link;
       if (link->endpoint.type == SENDER) {
         // XXX: need to prevent duplicate disposition sending
-        pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+        pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
         /*if ((int16_t) ssn_state->local_channel >= 0) {
           pn_post_disp(transport, delivery);
           }*/
@@ -1341,9 +1415,9 @@ void pn_process_link_teardown(pn_transpo
   {
     pn_link_t *link = (pn_link_t *) endpoint;
     pn_session_t *session = link->session;
-    pn_session_state_t *ssn_state = pn_session_state(transport, session);
-    pn_link_state_t *state = pn_link_state(ssn_state, link);
-    if (endpoint->local_state == CLOSED && (int32_t) state->local_handle >= 0) {
+    pn_session_state_t *ssn_state = pn_session_get_state(transport, session);
+    pn_link_state_t *state = pn_link_get_state(ssn_state, link);
+    if (endpoint->state & PN_LOCAL_CLOSED && (int32_t) state->local_handle >= 0) {
       pn_init_frame(transport->disp);
       pn_field(transport->disp, DETACH_HANDLE, pn_value("I", state->local_handle));
       pn_field(transport->disp, DETACH_CLOSED, pn_boolean(true));
@@ -1362,8 +1436,8 @@ void pn_process_ssn_teardown(pn_transpor
   if (endpoint->type == SESSION)
   {
     pn_session_t *session = (pn_session_t *) endpoint;
-    pn_session_state_t *state = pn_session_state(transport, session);
-    if (endpoint->local_state == CLOSED && (int16_t) state->local_channel >= 0)
+    pn_session_state_t *state = pn_session_get_state(transport, session);
+    if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0)
     {
       pn_init_frame(transport->disp);
       /*if (condition)
@@ -1379,7 +1453,7 @@ void pn_process_conn_teardown(pn_transpo
 {
   if (endpoint->type == CONNECTION)
   {
-    if (endpoint->local_state == CLOSED && !transport->close_sent) {
+    if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) {
       pn_init_frame(transport->disp);
       /*if (condition)
       // XXX: symbol
@@ -1430,11 +1504,11 @@ void pn_process(pn_transport_t *transpor
 
 ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
 {
-  if (transport->endpoint.local_state != UNINIT) {
+  if (!(transport->endpoint.state & PN_LOCAL_UNINIT)) {
     pn_process(transport);
   }
 
-  if (!transport->disp->available && transport->endpoint.local_state == CLOSED) {
+  if (!transport->disp->available && transport->endpoint.state & PN_LOCAL_CLOSED) {
     return PN_EOS;
   }
 
@@ -1448,9 +1522,9 @@ void pn_trace(pn_transport_t *transport,
   transport->disp->trace = trace;
 }
 
-ssize_t pn_send(pn_sender_t *sender, const char *bytes, size_t n)
+ssize_t pn_send(pn_link_t *sender, const char *bytes, size_t n)
 {
-  pn_delivery_t *current = pn_current(&sender->link);
+  pn_delivery_t *current = pn_current(sender);
   if (!current) return -1;
   if (current->bytes) return 0;
   PN_ENSURE(current->bytes, current->capacity, current->size + n);
@@ -1460,10 +1534,9 @@ ssize_t pn_send(pn_sender_t *sender, con
   return n;
 }
 
-ssize_t pn_recv(pn_receiver_t *receiver, char *bytes, size_t n)
+ssize_t pn_recv(pn_link_t *receiver, char *bytes, size_t n)
 {
-  pn_link_t *link = &receiver->link;
-  pn_delivery_t *delivery = link->current;
+  pn_delivery_t *delivery = receiver->current;
   if (delivery) {
     if (delivery->size) {
       size_t size = n > delivery->size ? delivery->size : n;
@@ -1480,10 +1553,10 @@ ssize_t pn_recv(pn_receiver_t *receiver,
   }
 }
 
-void pn_flow(pn_receiver_t *receiver, int credits)
+void pn_flow(pn_link_t *receiver, int credits)
 {
   receiver->credits += credits;
-  pn_modified(receiver->link.session->connection, &receiver->link.endpoint);
+  pn_modified(receiver->session->connection, &receiver->endpoint);
 }
 
 time_t pn_tick(pn_transport_t *engine, time_t now)

Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Thu Mar 22 14:45:38 2012
@@ -116,54 +116,42 @@ void server_callback(pn_selectable_t *se
   char msg[1024];
   char data[1024];
 
-  pn_endpoint_t *endpoint = pn_endpoint_head(conn, UNINIT, ACTIVE);
-  while (endpoint)
-  {
-    switch (pn_endpoint_type(endpoint))
-    {
-    case CONNECTION:
-    case SESSION:
-      if (pn_remote_state(endpoint) != UNINIT)
-        pn_open(endpoint);
-      break;
-    case SENDER:
-    case RECEIVER:
-      {
-        pn_link_t *link = (pn_link_t *) endpoint;
-        if (pn_remote_state(endpoint) != UNINIT) {
-          printf("%ls, %ls\n", pn_remote_source(link), pn_remote_target(link));
-          pn_set_source(link, pn_remote_source(link));
-          pn_set_target(link, pn_remote_target(link));
-          pn_open(endpoint);
-          if (pn_endpoint_type(endpoint) == RECEIVER) {
-            pn_flow((pn_receiver_t *) endpoint, 100);
-          } else {
-            pn_binary_t *tag = pn_binary("blah", 4);
-            pn_delivery(link, tag);
-            pn_free_binary(tag);
-          }
-        }
-      }
-      break;
-    case TRANSPORT:
-      break;
+  if (pn_connection_state(conn) == (PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)) {
+    pn_connection_open(conn);
+  }
+
+  pn_session_t *ssn = pn_session_head(conn, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE);
+  while (ssn) {
+    pn_session_open(ssn);
+    ssn = pn_session_next(ssn, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE);
+  }
+
+  pn_link_t *link = pn_link_head(conn, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE);
+  while (link) {
+    printf("%ls, %ls\n", pn_remote_source(link), pn_remote_target(link));
+    pn_set_source(link, pn_remote_source(link));
+    pn_set_target(link, pn_remote_target(link));
+    pn_link_open(link);
+    if (pn_is_receiver(link)) {
+      pn_flow(link, 100);
+    } else {
+      pn_delivery(link, pn_dtag("blah", 4));
     }
 
-    endpoint = pn_endpoint_next(endpoint, UNINIT, ACTIVE);
+    link = pn_link_next(link, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE);
   }
 
   pn_delivery_t *delivery = pn_work_head(conn);
   while (delivery)
   {
-    pn_binary_t *tag = pn_delivery_tag(delivery);
-    pn_format(tagstr, 1024, pn_from_binary(tag));
+    pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+    pn_quote_data(tagstr, 1024, tag.bytes, tag.size);
     pn_link_t *link = pn_link(delivery);
     if (pn_readable(delivery)) {
       printf("received delivery: %s\n", tagstr);
-      pn_receiver_t *receiver = (pn_receiver_t *) link;
       printf("  payload = \"");
       while (true) {
-        ssize_t n = pn_recv(receiver, msg, 1024);
+        ssize_t n = pn_recv(link, msg, 1024);
         if (n == PN_EOM) {
           pn_advance(link);
           pn_disposition(delivery, PN_ACCEPTED);
@@ -174,17 +162,14 @@ void server_callback(pn_selectable_t *se
       }
       printf("\"\n");
     } else if (pn_writable(delivery)) {
-      pn_sender_t *sender = (pn_sender_t *) link;
       sprintf(msg, "message body for %s", tagstr);
       size_t n = pn_message_data(data, 1024, msg, strlen(msg));
-      pn_send(sender, data, n);
+      pn_send(link, data, n);
       if (pn_advance(link)) {
         printf("sent delivery: %s\n", tagstr);
         char tagbuf[16];
         sprintf(tagbuf, "%i", ctx->count++);
-        pn_binary_t *tag = pn_binary(tagbuf, strlen(tagbuf));
-        pn_delivery(link, tag);
-        pn_free_binary(tag);
+        pn_delivery(link, pn_dtag(tagbuf, strlen(tagbuf)));
       }
     }
 
@@ -196,24 +181,20 @@ void server_callback(pn_selectable_t *se
     delivery = pn_work_next(delivery);
   }
 
-  endpoint = pn_endpoint_head(conn, ACTIVE, CLOSED);
-  while (endpoint)
-  {
-    switch (pn_endpoint_type(endpoint))
-    {
-    case CONNECTION:
-    case SESSION:
-    case SENDER:
-    case RECEIVER:
-      if (pn_remote_state(endpoint) == CLOSED) {
-        pn_close(endpoint);
-      }
-      break;
-    case TRANSPORT:
-      break;
-    }
+  if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+    pn_connection_close(conn);
+  }
+
+  ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+  while (ssn) {
+    pn_session_close(ssn);
+    ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+  }
 
-    endpoint = pn_endpoint_next(endpoint, ACTIVE, CLOSED);
+  link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+  while (link) {
+    pn_link_close(link);
+    link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
   }
 }
 
@@ -266,27 +247,25 @@ void client_callback(pn_selectable_t *se
     pn_connection_set_hostname(connection, ctx->hostname);
 
     pn_session_t *ssn = pn_session(connection);
-    pn_open((pn_endpoint_t *) connection);
-    pn_open((pn_endpoint_t *) ssn);
+    pn_connection_open(connection);
+    pn_session_open(ssn);
 
     if (ctx->send_count) {
-      pn_sender_t *snd = pn_sender(ssn, L"sender");
-      pn_set_target((pn_link_t *) snd, ctx->address);
-      pn_open((pn_endpoint_t *) snd);
+      pn_link_t *snd = pn_sender(ssn, L"sender");
+      pn_set_target(snd, ctx->address);
+      pn_link_open(snd);
 
       char buf[16];
       for (int i = 0; i < ctx->send_count; i++) {
         sprintf(buf, "%c", 'a' + i);
-        pn_binary_t *tag = pn_binary(buf, strlen(buf));
-        pn_delivery((pn_link_t *) snd, tag);
-        pn_free_binary(tag);
+        pn_delivery(snd, pn_dtag(buf, strlen(buf)));
       }
     }
 
     if (ctx->recv_count) {
-      pn_receiver_t *rcv = pn_receiver(ssn, L"receiver");
-      pn_set_source((pn_link_t *) rcv, ctx->address);
-      pn_open((pn_endpoint_t *) rcv);
+      pn_link_t *rcv = pn_receiver(ssn, L"receiver");
+      pn_set_source(rcv, ctx->address);
+      pn_link_open(rcv);
       pn_flow(rcv, ctx->recv_count);
     }
   }
@@ -294,27 +273,25 @@ void client_callback(pn_selectable_t *se
   pn_delivery_t *delivery = pn_work_head(connection);
   while (delivery)
   {
-    pn_binary_t *tag = pn_delivery_tag(delivery);
-    pn_format(tagstr, 1024, pn_from_binary(tag));
+    pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+    pn_quote_data(tagstr, 1024, tag.bytes, tag.size);
     pn_link_t *link = pn_link(delivery);
     if (pn_writable(delivery)) {
-      pn_sender_t *snd = (pn_sender_t *) link;
       sprintf(msg, "message body for %s", tagstr);
       ssize_t n = pn_message_data(data, 1024, msg, strlen(msg));
-      pn_send(snd, data, n);
+      pn_send(link, data, n);
       if (pn_advance(link)) printf("sent delivery: %s\n", tagstr);
     } else if (pn_readable(delivery)) {
       printf("received delivery: %s\n", tagstr);
-      pn_receiver_t *rcv = (pn_receiver_t *) link;
       printf("  payload = \"");
       while (true) {
-        size_t n = pn_recv(rcv, msg, 1024);
+        size_t n = pn_recv(link, msg, 1024);
         if (n == PN_EOM) {
           pn_advance(link);
           pn_disposition(delivery, PN_ACCEPTED);
           pn_settle(delivery);
           if (!--ctx->recv_count) {
-            pn_close((pn_endpoint_t *)link);
+            pn_link_close(link);
           }
           break;
         } else {
@@ -329,7 +306,7 @@ void client_callback(pn_selectable_t *se
       pn_clean(delivery);
       pn_settle(delivery);
       if (!--ctx->send_count) {
-        pn_close((pn_endpoint_t *)link);
+        pn_link_close(link);
       }
     }
 
@@ -340,23 +317,11 @@ void client_callback(pn_selectable_t *se
     printf("closing\n");
     // XXX: how do we close the session?
     //pn_close((pn_endpoint_t *) ssn);
-    pn_close((pn_endpoint_t *)connection);
+    pn_connection_close(connection);
   }
 
-  pn_endpoint_t *endpoint = pn_endpoint_head(connection, CLOSED, CLOSED);
-  while (endpoint)
-  {
-    switch (pn_endpoint_type(endpoint)) {
-    case CONNECTION:
-      pn_driver_stop(ctx->driver);
-      break;
-    case SESSION:
-    case SENDER:
-    case RECEIVER:
-    case TRANSPORT:
-      break;
-    }
-    endpoint = pn_endpoint_next(endpoint, CLOSED, CLOSED);
+  if (pn_connection_state(connection) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
+    pn_driver_stop(ctx->driver);
   }
 }
 

Modified: qpid/proton/trunk/proton-c/src/types/binary.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/types/binary.c?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/types/binary.c (original)
+++ qpid/proton/trunk/proton-c/src/types/binary.c Thu Mar 22 14:45:38 2012
@@ -26,7 +26,7 @@
 #include "value-internal.h"
 #include "../util.h"
 
-pn_binary_t *pn_binary(char *bytes, size_t size)
+pn_binary_t *pn_binary(const char *bytes, size_t size)
 {
   pn_binary_t *bin = malloc(sizeof(pn_binary_t) + size);
   bin->size = size;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org