You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/03/22 15:45:39 UTC
svn commit: r1303812 - in /qpid/proton/trunk/proton-c: CMakeLists.txt
cproton.i include/proton/engine.h include/proton/sasl.h
include/proton/value.h proton.i src/driver.c src/engine/engine-internal.h
src/engine/engine.c src/proton.c src/types/binary.c
Author: rhs
Date: Thu Mar 22 14:45:38 2012
New Revision: 1303812
URL: http://svn.apache.org/viewvc?rev=1303812&view=rev
Log:
API updates based on swig experience
Added:
qpid/proton/trunk/proton-c/cproton.i
- copied, changed from r1303285, qpid/proton/trunk/proton-c/proton.i
Removed:
qpid/proton/trunk/proton-c/proton.i
Modified:
qpid/proton/trunk/proton-c/CMakeLists.txt
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/include/proton/sasl.h
qpid/proton/trunk/proton-c/include/proton/value.h
qpid/proton/trunk/proton-c/src/driver.c
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/proton.c
qpid/proton/trunk/proton-c/src/types/binary.c
Modified: qpid/proton/trunk/proton-c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/CMakeLists.txt?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/CMakeLists.txt Thu Mar 22 14:45:38 2012
@@ -61,8 +61,8 @@ find_package (PythonLibs)
include_directories (${PYTHON_INCLUDE_PATH})
include_directories (${PROJECT_SOURCE_DIR})
-swig_add_module(proton python proton.i)
-swig_link_libraries(proton qpidproton ${PYTHON_LIBRARIES})
+swig_add_module(cproton python cproton.i)
+swig_link_libraries(cproton qpidproton ${PYTHON_LIBRARIES})
add_executable (proton src/proton.c)
target_link_libraries (proton qpidproton)
Copied: qpid/proton/trunk/proton-c/cproton.i (from r1303285, qpid/proton/trunk/proton-c/proton.i)
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/cproton.i?p2=qpid/proton/trunk/proton-c/cproton.i&p1=qpid/proton/trunk/proton-c/proton.i&r1=1303285&r2=1303812&rev=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/proton.i (original)
+++ qpid/proton/trunk/proton-c/cproton.i Thu Mar 22 14:45:38 2012
@@ -1,8 +1,62 @@
-%module proton
+%module cproton
%{
/* Includes the header in the wrapper code */
#include <proton/engine.h>
%}
+typedef unsigned int size_t;
+typedef signed int ssize_t;
+
+%include <cwstring.i>
+%include <cstring.i>
+
+%cstring_output_withsize(char *OUTPUT, size_t *OUTPUT_SIZE)
+
+int wrap_pn_output(pn_transport_t *transport, char *OUTPUT, size_t *OUTPUT_SIZE);
+
+%rename(pn_output) wrap_pn_output;
+
+%inline %{
+ int wrap_pn_output(pn_transport_t *transport, char *OUTPUT, size_t *OUTPUT_SIZE) {
+ ssize_t sz = pn_output(transport, OUTPUT, *OUTPUT_SIZE);
+ if (sz >= 0) {
+ *OUTPUT_SIZE = sz;
+ return 0;
+ } else {
+ *OUTPUT_SIZE = 0;
+ return sz;
+ }
+ }
+%}
+
+%ignore pn_output;
+
+pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH);
+
+%rename(pn_delivery) wrap_pn_delivery;
+
+%inline %{
+ pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH) {
+ return pn_delivery(link, pn_dtag(STRING, LENGTH));
+ }
+%}
+
+%ignore pn_delivery;
+
+%cstring_output_allocate_size(char **ALLOC_OUTPUT, size_t *ALLOC_SIZE, free(*$1));
+
+%rename(pn_delivery_tag) wrap_pn_delivery_tag;
+
+%inline %{
+ void wrap_pn_delivery_tag(pn_delivery_t *delivery, char **ALLOC_OUTPUT, size_t *ALLOC_SIZE) {
+ pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+ *ALLOC_OUTPUT = malloc(tag.size);
+ *ALLOC_SIZE = tag.size;
+ memcpy(*ALLOC_OUTPUT, tag.bytes, tag.size);
+ }
+%}
+
+%ignore pn_delivery_tag;
+
/* Parse the header file to generate wrappers */
%include "proton/engine.h"
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Thu Mar 22 14:45:38 2012
@@ -25,93 +25,119 @@
#include <stdbool.h>
#include <stddef.h>
#include <sys/types.h>
-#include <proton/value.h>
typedef struct pn_error_t pn_error_t;
-typedef struct pn_endpoint_t pn_endpoint_t;
typedef struct pn_transport_t pn_transport_t;
typedef struct pn_connection_t pn_connection_t;
typedef struct pn_session_t pn_session_t;
typedef struct pn_link_t pn_link_t;
-typedef struct pn_sender_t pn_sender_t;
-typedef struct pn_receiver_t pn_receiver_t;
typedef struct pn_delivery_t pn_delivery_t;
-typedef enum pn_endpoint_state_t {UNINIT=1, ACTIVE=2, CLOSED=4} pn_endpoint_state_t;
-typedef enum pn_endpoint_type_t {CONNECTION=1, TRANSPORT=2, SESSION=3, SENDER=4, RECEIVER=5} pn_endpoint_type_t;
-typedef enum pn_disposition_t {PN_RECEIVED=1, PN_ACCEPTED=2, PN_REJECTED=3, PN_RELEASED=4, PN_MODIFIED=5} pn_disposition_t;
+typedef struct pn_delivery_tag_t {
+ size_t size;
+ const char *bytes;
+} pn_delivery_tag_t;
+
+#define pn_dtag(BYTES, SIZE) ((pn_delivery_tag_t) {(SIZE), (BYTES)})
+
+typedef enum pn_state_t {
+ PN_LOCAL_UNINIT=1,
+ PN_LOCAL_ACTIVE=2,
+ PN_LOCAL_CLOSED=4,
+ PN_REMOTE_UNINIT=8,
+ PN_REMOTE_ACTIVE=16,
+ PN_REMOTE_CLOSED=32
+} pn_state_t;
+
+typedef enum pn_disposition_t {
+ PN_RECEIVED=1,
+ PN_ACCEPTED=2,
+ PN_REJECTED=3,
+ PN_RELEASED=4,
+ PN_MODIFIED=5
+} pn_disposition_t;
typedef enum pn_trace_t {PN_TRACE_OFF=0, PN_TRACE_RAW=1, PN_TRACE_FRM=2} pn_trace_t;
-/* Currently the way inheritence is done it is safe to "upcast" from
- pn_{transport,connection,session,link,sender,or receiver}_t to
- pn_endpoint_t and to "downcast" based on the endpoint type. I'm
- not sure if this should be part of the ABI or not. */
-
-// endpoint
-pn_endpoint_type_t pn_endpoint_type(pn_endpoint_t *endpoint);
-pn_endpoint_state_t pn_local_state(pn_endpoint_t *endpoint);
-pn_endpoint_state_t pn_remote_state(pn_endpoint_t *endpoint);
-pn_error_t *pn_local_error(pn_endpoint_t *endpoint);
-pn_error_t *pn_remote_error(pn_endpoint_t *endpoint);
-void pn_destroy(pn_endpoint_t *endpoint);
-void pn_open(pn_endpoint_t *endpoint);
-void pn_close(pn_endpoint_t *endpoint);
-
// connection
pn_connection_t *pn_connection();
+
+pn_state_t pn_connection_state(pn_connection_t *connection);
+pn_error_t *pn_connection_error(pn_connection_t *connection);
void pn_connection_set_container(pn_connection_t *connection, const wchar_t *container);
void pn_connection_set_hostname(pn_connection_t *connection, const wchar_t *hostname);
+
pn_delivery_t *pn_work_head(pn_connection_t *connection);
pn_delivery_t *pn_work_next(pn_delivery_t *delivery);
pn_session_t *pn_session(pn_connection_t *connection);
pn_transport_t *pn_transport(pn_connection_t *connection);
-pn_endpoint_t *pn_endpoint_head(pn_connection_t *connection,
- pn_endpoint_state_t local,
- pn_endpoint_state_t remote);
-pn_endpoint_t *pn_endpoint_next(pn_endpoint_t *endpoint,
- pn_endpoint_state_t local,
- pn_endpoint_state_t remote);
+pn_session_t *pn_session_head(pn_connection_t *connection, pn_state_t state);
+pn_session_t *pn_session_next(pn_session_t *session, pn_state_t state);
+
+pn_link_t *pn_link_head(pn_connection_t *connection, pn_state_t state);
+pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state);
+
+void pn_connection_open(pn_connection_t *connection);
+void pn_connection_close(pn_connection_t *connection);
+void pn_connection_destroy(pn_connection_t *connection);
// transport
#define PN_EOS (-1)
#define PN_ERR (-2)
+pn_state_t pn_transport_state(pn_transport_t *transport);
+pn_error_t *pn_transport_error(pn_transport_t *transport);
ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available);
ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size);
time_t pn_tick(pn_transport_t *transport, time_t now);
void pn_trace(pn_transport_t *transport, pn_trace_t trace);
+void pn_transport_open(pn_transport_t *transport);
+void pn_transport_close(pn_transport_t *transport);
+void pn_transport_destroy(pn_transport_t *transport);
// session
-pn_sender_t *pn_sender(pn_session_t *session, const wchar_t *name);
-pn_receiver_t *pn_receiver(pn_session_t *session, const wchar_t *name);
+pn_state_t pn_session_state(pn_session_t *session);
+pn_error_t *pn_session_error(pn_session_t *session);
+pn_link_t *pn_sender(pn_session_t *session, const wchar_t *name);
+pn_link_t *pn_receiver(pn_session_t *session, const wchar_t *name);
+void pn_session_open(pn_session_t *session);
+void pn_session_close(pn_session_t *session);
+void pn_session_destroy(pn_session_t *session);
// link
+bool pn_is_sender(pn_link_t *link);
+bool pn_is_receiver(pn_link_t *link);
+pn_state_t pn_link_state(pn_link_t *link);
+pn_error_t *pn_link_error(pn_link_t *link);
pn_session_t *pn_get_session(pn_link_t *link);
void pn_set_source(pn_link_t *link, const wchar_t *source);
void pn_set_target(pn_link_t *link, const wchar_t *target);
wchar_t *pn_remote_source(pn_link_t *link);
wchar_t *pn_remote_target(pn_link_t *link);
-pn_delivery_t *pn_delivery(pn_link_t *link, pn_binary_t *tag);
+pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag);
pn_delivery_t *pn_current(pn_link_t *link);
bool pn_advance(pn_link_t *link);
pn_delivery_t *pn_unsettled_head(pn_link_t *link);
pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery);
+void pn_link_open(pn_link_t *sender);
+void pn_link_close(pn_link_t *sender);
+void pn_link_destroy(pn_link_t *sender);
+
// sender
//void pn_offer(pn_sender_t *sender, int credits);
-ssize_t pn_send(pn_sender_t *sender, const char *bytes, size_t n);
+ssize_t pn_send(pn_link_t *sender, const char *bytes, size_t n);
//void pn_abort(pn_sender_t *sender);
// receiver
#define PN_EOM (-1)
-void pn_flow(pn_receiver_t *receiver, int credits);
-ssize_t pn_recv(pn_receiver_t *receiver, char *bytes, size_t n);
+void pn_flow(pn_link_t *receiver, int credits);
+ssize_t pn_recv(pn_link_t *receiver, char *bytes, size_t n);
// delivery
-pn_binary_t *pn_delivery_tag(pn_delivery_t *delivery);
+pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery);
pn_link_t *pn_link(pn_delivery_t *delivery);
// how do we do delivery state?
int pn_local_disp(pn_delivery_t *delivery);
Modified: qpid/proton/trunk/proton-c/include/proton/sasl.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/sasl.h?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/sasl.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/sasl.h Thu Mar 22 14:45:38 2012
@@ -24,6 +24,7 @@
#include <sys/types.h>
#include <stdbool.h>
+#include <proton/value.h>
typedef struct pn_sasl_t pn_sasl_t;
Modified: qpid/proton/trunk/proton-c/include/proton/value.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/value.h?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/value.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/value.h Thu Mar 22 14:45:38 2012
@@ -189,7 +189,7 @@ wchar_t *pn_string_wcs(pn_string_t *str)
/* binary */
-pn_binary_t *pn_binary(char *bytes, size_t size);
+pn_binary_t *pn_binary(const char *bytes, size_t size);
size_t pn_binary_size(pn_binary_t *b);
const char *pn_binary_bytes(pn_binary_t *b);
pn_binary_t *pn_binary_dup(pn_binary_t *b);
Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Thu Mar 22 14:45:38 2012
@@ -153,9 +153,11 @@ void *pn_selectable_context(pn_selectabl
void pn_selectable_destroy(pn_selectable_t *sel)
{
+ if (!sel) return;
+
if (sel->driver) pn_driver_remove(sel->driver, sel);
- if (sel->connection) pn_destroy((pn_endpoint_t *) sel->connection);
- if (sel->sasl) pn_sasl_destroy(sel->sasl);
+ pn_connection_destroy(sel->connection);
+ pn_sasl_destroy(sel->sasl);
free(sel);
}
@@ -322,7 +324,7 @@ static ssize_t pn_selectable_write_amqp_
fprintf(stderr, " -> AMQP 1.0\n");
memmove(pn_selectable_output(sel), "AMQP\x00\x01\x00\x00", 8);
sel->process_output = pn_selectable_write_amqp;
- pn_open((pn_endpoint_t *) sel->transport);
+ pn_transport_open(sel->transport);
return 8;
}
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Mar 22 14:45:38 2012
@@ -35,10 +35,14 @@ struct pn_error_t {
pn_map_t *info;
};
+typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER, TRANSPORT} pn_endpoint_type_t;
+
+typedef struct pn_endpoint_t pn_endpoint_t;
+
struct pn_endpoint_t {
pn_endpoint_type_t type;
- pn_endpoint_state_t local_state, remote_state;
- pn_error_t local_error, remote_error;
+ pn_state_t state;
+ pn_error_t error;
pn_endpoint_t *endpoint_next;
pn_endpoint_t *endpoint_prev;
pn_endpoint_t *transport_next;
@@ -140,17 +144,10 @@ struct pn_link_t {
pn_delivery_t *current;
pn_delivery_t *settled_head;
pn_delivery_t *settled_tail;
+ // XXX
pn_sequence_t credit;
- size_t id;
-};
-
-struct pn_sender_t {
- pn_link_t link;
-};
-
-struct pn_receiver_t {
- pn_link_t link;
pn_sequence_t credits;
+ size_t id;
};
struct pn_delivery_t {
@@ -175,11 +172,11 @@ struct pn_delivery_t {
void *context;
};
-void pn_destroy_connection(pn_connection_t *connection);
-void pn_destroy_transport(pn_transport_t *transport);
-void pn_destroy_session(pn_session_t *session);
-void pn_destroy_sender(pn_sender_t *sender);
-void pn_destroy_receiver(pn_receiver_t *receiver);
+#define PN_SET_LOCAL(OLD, NEW) \
+ (OLD) = ((OLD) & (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED)) | (NEW)
+
+#define PN_SET_REMOTE(OLD, NEW) \
+ (OLD) = ((OLD) & (PN_LOCAL_UNINIT | PN_LOCAL_ACTIVE | PN_LOCAL_CLOSED)) | (NEW)
void pn_link_dump(pn_link_t *link);
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Mar 22 14:45:38 2012
@@ -135,35 +135,37 @@ void pn_delivery_buffer_gc(pn_delivery_b
// endpoints
-pn_endpoint_type_t pn_endpoint_type(pn_endpoint_t *endpoint)
+pn_connection_t *pn_get_connection(pn_endpoint_t *endpoint)
{
- return endpoint->type;
-}
+ switch (endpoint->type) {
+ case CONNECTION:
+ return (pn_connection_t *) endpoint;
+ case SESSION:
+ return ((pn_session_t *) endpoint)->connection;
+ case SENDER:
+ case RECEIVER:
+ return ((pn_link_t *) endpoint)->session->connection;
+ case TRANSPORT:
+ return ((pn_transport_t *) endpoint)->connection;
+ }
-pn_endpoint_state_t pn_local_state(pn_endpoint_t *endpoint)
-{
- return endpoint->local_state;
+ return NULL;
}
-pn_endpoint_state_t pn_remote_state(pn_endpoint_t *endpoint)
-{
- return endpoint->remote_state;
-}
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
-pn_error_t *pn_local_error(pn_endpoint_t *endpoint)
+void pn_open(pn_endpoint_t *endpoint)
{
- if (endpoint->local_error.condition)
- return &endpoint->local_error;
- else
- return NULL;
+ // TODO: do we care about the current state?
+ PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE);
+ pn_modified(pn_get_connection(endpoint), endpoint);
}
-pn_error_t *pn_remote_error(pn_endpoint_t *endpoint)
+void pn_close(pn_endpoint_t *endpoint)
{
- if (endpoint->remote_error.condition)
- return &endpoint->remote_error;
- else
- return NULL;
+ // TODO: do we care about the current state?
+ PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED);
+ pn_modified(pn_get_connection(endpoint), endpoint);
}
void pn_destroy(pn_endpoint_t *endpoint)
@@ -171,36 +173,60 @@ void pn_destroy(pn_endpoint_t *endpoint)
switch (endpoint->type)
{
case CONNECTION:
- pn_destroy_connection((pn_connection_t *)endpoint);
+ pn_connection_destroy((pn_connection_t *)endpoint);
break;
case TRANSPORT:
- pn_destroy_transport((pn_transport_t *)endpoint);
+ pn_transport_destroy((pn_transport_t *)endpoint);
break;
case SESSION:
- pn_destroy_session((pn_session_t *)endpoint);
+ pn_session_destroy((pn_session_t *)endpoint);
break;
case SENDER:
- pn_destroy_sender((pn_sender_t *)endpoint);
+ pn_link_destroy((pn_link_t *)endpoint);
break;
case RECEIVER:
- pn_destroy_receiver((pn_receiver_t *)endpoint);
+ pn_link_destroy((pn_link_t *)endpoint);
break;
}
}
-void pn_destroy_connection(pn_connection_t *connection)
+void pn_connection_open(pn_connection_t *connection)
{
- pn_destroy_transport(connection->transport);
+ pn_open((pn_endpoint_t *) connection);
+}
+
+void pn_connection_close(pn_connection_t *connection)
+{
+ pn_close((pn_endpoint_t *) connection);
+}
+
+void pn_connection_destroy(pn_connection_t *connection)
+{
+ if (!connection) return;
+
+ pn_transport_destroy(connection->transport);
while (connection->session_count)
- pn_destroy_session(connection->sessions[connection->session_count - 1]);
+ pn_session_destroy(connection->sessions[connection->session_count - 1]);
free(connection->sessions);
free(connection->container);
free(connection->hostname);
free(connection);
}
-void pn_destroy_transport(pn_transport_t *transport)
+void pn_transport_open(pn_transport_t *transport)
{
+ pn_open((pn_endpoint_t *) transport);
+}
+
+void pn_transport_close(pn_transport_t *transport)
+{
+ pn_close((pn_endpoint_t *) transport);
+}
+
+void pn_transport_destroy(pn_transport_t *transport)
+{
+ if (!transport) return;
+
pn_dispatcher_destroy(transport->disp);
for (int i = 0; i < transport->session_capacity; i++) {
pn_delivery_buffer_destroy(&transport->sessions[i].incoming);
@@ -235,10 +261,22 @@ void pn_remove_session(pn_connection_t *
ssn->connection = NULL;
}
-void pn_destroy_session(pn_session_t *session)
+void pn_session_open(pn_session_t *session)
+{
+ pn_open((pn_endpoint_t *) session);
+}
+
+void pn_session_close(pn_session_t *session)
{
+ pn_close((pn_endpoint_t *) session);
+}
+
+void pn_session_destroy(pn_session_t *session)
+{
+ if (!session) return;
+
while (session->link_count)
- pn_destroy(&session->links[session->link_count - 1]->endpoint);
+ pn_link_destroy(session->links[session->link_count - 1]);
pn_remove_session(session->connection, session);
free(session->links);
free(session);
@@ -318,36 +356,36 @@ void pn_link_dump(pn_link_t *link)
printf("\n");
}
-void pn_link_uninit(pn_link_t *link)
+void pn_link_open(pn_link_t *link)
{
- if (link->local_source) free(link->local_source);
- if (link->local_target) free(link->local_target);
- if (link->remote_source) free(link->remote_source);
- if (link->remote_target) free(link->remote_target);
- pn_remove_link(link->session, link);
- pn_free_deliveries(link->settled_head);
- pn_free_deliveries(link->head);
- free(link->name);
+ pn_open((pn_endpoint_t *) link);
}
-void pn_destroy_sender(pn_sender_t *sender)
+void pn_link_close(pn_link_t *link)
{
- pn_link_uninit(&sender->link);
- free(sender);
+ pn_close((pn_endpoint_t *) link);
}
-void pn_destroy_receiver(pn_receiver_t *receiver)
+
+void pn_link_destroy(pn_link_t *link)
{
- pn_link_uninit(&receiver->link);
- free(receiver);
+ if (!link) return;
+
+ free(link->local_source);
+ free(link->local_target);
+ free(link->remote_source);
+ free(link->remote_target);
+ pn_remove_link(link->session, link);
+ pn_free_deliveries(link->settled_head);
+ pn_free_deliveries(link->head);
+ free(link->name);
+ free(link);
}
void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
{
endpoint->type = type;
- endpoint->local_state = UNINIT;
- endpoint->remote_state = UNINIT;
- endpoint->local_error = (pn_error_t) {.condition = NULL};
- endpoint->remote_error = (pn_error_t) {.condition = NULL};
+ endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
+ endpoint->error = (pn_error_t) {.condition = NULL};
endpoint->endpoint_next = NULL;
endpoint->endpoint_prev = NULL;
endpoint->transport_next = NULL;
@@ -357,39 +395,6 @@ void pn_endpoint_init(pn_endpoint_t *end
LL_ADD_PFX(conn->endpoint_head, conn->endpoint_tail, endpoint, endpoint_);
}
-pn_connection_t *pn_get_connection(pn_endpoint_t *endpoint)
-{
- switch (endpoint->type) {
- case CONNECTION:
- return (pn_connection_t *) endpoint;
- case SESSION:
- return ((pn_session_t *) endpoint)->connection;
- case SENDER:
- case RECEIVER:
- return ((pn_link_t *) endpoint)->session->connection;
- case TRANSPORT:
- return ((pn_transport_t *) endpoint)->connection;
- }
-
- return NULL;
-}
-
-void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
-
-void pn_open(pn_endpoint_t *endpoint)
-{
- // TODO: do we care about the current state?
- endpoint->local_state = ACTIVE;
- pn_modified(pn_get_connection(endpoint), endpoint);
-}
-
-void pn_close(pn_endpoint_t *endpoint)
-{
- // TODO: do we care about the current state?
- endpoint->local_state = CLOSED;
- pn_modified(pn_get_connection(endpoint), endpoint);
-}
-
pn_connection_t *pn_connection()
{
pn_connection_t *conn = malloc(sizeof(pn_connection_t));
@@ -414,6 +419,16 @@ pn_connection_t *pn_connection()
return conn;
}
+pn_state_t pn_connection_state(pn_connection_t *connection)
+{
+ return connection->endpoint.state;
+}
+
+pn_error_t *pn_connection_error(pn_connection_t *connection)
+{
+ return &connection->endpoint.error;
+}
+
void pn_connection_set_container(pn_connection_t *connection, const wchar_t *container)
{
if (connection->container) free(connection->container);
@@ -530,36 +545,58 @@ void pn_clear_modified(pn_connection_t *
}
}
-bool pn_matches(pn_endpoint_t *endpoint, pn_endpoint_state_t local,
- pn_endpoint_state_t remote)
+bool pn_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
{
- return (endpoint->local_state & local) && (endpoint->remote_state & remote);
+ return (endpoint->type == type) && (endpoint->state == state);
}
-pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_state_t local,
- pn_endpoint_state_t remote)
+pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
{
while (endpoint)
{
- if (pn_matches(endpoint, local, remote))
+ if (pn_matches(endpoint, type, state))
return endpoint;
endpoint = endpoint->endpoint_next;
}
return NULL;
}
-pn_endpoint_t *pn_endpoint_head(pn_connection_t *conn,
- pn_endpoint_state_t local,
- pn_endpoint_state_t remote)
+pn_session_t *pn_session_head(pn_connection_t *conn, pn_state_t state)
{
- return pn_find(conn->endpoint_head, local, remote);
+ return (pn_session_t *) pn_find(conn->endpoint_head, SESSION, state);
}
-pn_endpoint_t *pn_endpoint_next(pn_endpoint_t *endpoint,
- pn_endpoint_state_t local,
- pn_endpoint_state_t remote)
+pn_session_t *pn_session_next(pn_session_t *ssn, pn_state_t state)
{
- return pn_find(endpoint->endpoint_next, local, remote);
+ return (pn_session_t *) pn_find(ssn->endpoint.endpoint_next, SESSION, state);
+}
+
+pn_link_t *pn_link_head(pn_connection_t *conn, pn_state_t state)
+{
+ pn_endpoint_t *endpoint = conn->endpoint_head;
+
+ while (endpoint)
+ {
+ if (pn_matches(endpoint, SENDER, state) || pn_matches(endpoint, RECEIVER, state))
+ return (pn_link_t *) endpoint;
+ endpoint = endpoint->endpoint_next;
+ }
+
+ return NULL;
+}
+
+pn_link_t *pn_link_next(pn_link_t *ssn, pn_state_t state)
+{
+ pn_endpoint_t *endpoint = ssn->endpoint.endpoint_next;
+
+ while (endpoint)
+ {
+ if (pn_matches(endpoint, SENDER, state) || pn_matches(endpoint, RECEIVER, state))
+ return (pn_link_t *) endpoint;
+ endpoint = endpoint->endpoint_next;
+ }
+
+ return NULL;
}
pn_session_t *pn_session(pn_connection_t *conn)
@@ -576,6 +613,16 @@ pn_session_t *pn_session(pn_connection_t
return ssn;
}
+pn_state_t pn_session_state(pn_session_t *session)
+{
+ return session->endpoint.state;
+}
+
+pn_error_t *pn_session_error(pn_session_t *session)
+{
+ return &session->endpoint.error;
+}
+
void pn_do_open(pn_dispatcher_t *disp);
void pn_do_begin(pn_dispatcher_t *disp);
void pn_do_attach(pn_dispatcher_t *disp);
@@ -612,7 +659,7 @@ void pn_transport_init(pn_transport_t *t
transport->channel_capacity = 0;
}
-pn_session_state_t *pn_session_state(pn_transport_t *transport, pn_session_t *ssn)
+pn_session_state_t *pn_session_get_state(pn_transport_t *transport, pn_session_t *ssn)
{
int old_capacity = transport->session_capacity;
PN_ENSURE(transport->sessions, transport->session_capacity, ssn->id + 1);
@@ -658,6 +705,16 @@ pn_transport_t *pn_transport(pn_connecti
}
}
+pn_state_t pn_transport_state(pn_transport_t *transport)
+{
+ return transport->endpoint.state;
+}
+
+pn_error_t *pn_transport_error(pn_transport_t *transport)
+{
+ return &transport->endpoint.error;
+}
+
void pn_link_init(pn_link_t *link, int type, pn_session_t *session, const wchar_t *name)
{
pn_endpoint_init(&link->endpoint, type, session->connection);
@@ -670,6 +727,7 @@ void pn_link_init(pn_link_t *link, int t
link->settled_head = link->settled_tail = NULL;
link->head = link->tail = link->current = NULL;
link->credit = 0;
+ link->credits = 0;
}
void pn_set_source(pn_link_t *link, const wchar_t *source)
@@ -694,7 +752,7 @@ wchar_t *pn_remote_target(pn_link_t *lin
return link->remote_target;
}
-pn_link_state_t *pn_link_state(pn_session_state_t *ssn_state, pn_link_t *link)
+pn_link_state_t *pn_link_get_state(pn_session_state_t *ssn_state, pn_link_t *link)
{
int old_capacity = ssn_state->link_capacity;
PN_ENSURE(ssn_state->links, ssn_state->link_capacity, link->id + 1);
@@ -721,36 +779,55 @@ pn_link_state_t *pn_handle_state(pn_sess
return ssn_state->handles[handle];
}
-pn_sender_t *pn_sender(pn_session_t *session, const wchar_t *name)
+pn_link_t *pn_sender(pn_session_t *session, const wchar_t *name)
{
- pn_sender_t *snd = malloc(sizeof(pn_sender_t));
+ pn_link_t *snd = malloc(sizeof(pn_link_t));
if (!snd) return NULL;
- pn_link_init(&snd->link, SENDER, session, name);
+ pn_link_init(snd, SENDER, session, name);
return snd;
}
-pn_receiver_t *pn_receiver(pn_session_t *session, const wchar_t *name)
+pn_link_t *pn_receiver(pn_session_t *session, const wchar_t *name)
{
- pn_receiver_t *rcv = malloc(sizeof(pn_receiver_t));
+ pn_link_t *rcv = malloc(sizeof(pn_link_t));
if (!rcv) return NULL;
- pn_link_init(&rcv->link, RECEIVER, session, name);
- rcv->credits = 0;
+ pn_link_init(rcv, RECEIVER, session, name);
return rcv;
}
+pn_state_t pn_link_state(pn_link_t *link)
+{
+ return link->endpoint.state;
+}
+
+pn_error_t *pn_link_error(pn_link_t *link)
+{
+ return &link->endpoint.error;
+}
+
+bool pn_is_sender(pn_link_t *link)
+{
+ return link->endpoint.type == SENDER;
+}
+
+bool pn_is_receiver(pn_link_t *link)
+{
+ return link->endpoint.type == RECEIVER;
+}
+
pn_session_t *pn_get_session(pn_link_t *link)
{
return link->session;
}
-pn_delivery_t *pn_delivery(pn_link_t *link, pn_binary_t *tag)
+pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
{
pn_delivery_t *delivery = link->settled_head;
LL_POP_PFX(link->settled_head, link->settled_tail, link_);
if (!delivery) delivery = malloc(sizeof(pn_delivery_t));
if (!delivery) return NULL;
delivery->link = link;
- delivery->tag = pn_binary_dup(tag);
+ delivery->tag = pn_binary(tag.bytes, tag.size);
delivery->local_state = 0;
delivery->remote_state = 0;
delivery->local_settled = false;
@@ -804,9 +881,9 @@ void pn_delivery_dump(pn_delivery_t *d)
pn_readable(d), d->work);
}
-pn_binary_t *pn_delivery_tag(pn_delivery_t *delivery)
+pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
{
- return delivery->tag;
+ return pn_dtag(pn_binary_bytes(delivery->tag), pn_binary_size(delivery->tag));
}
pn_delivery_t *pn_current(pn_link_t *link)
@@ -814,9 +891,8 @@ pn_delivery_t *pn_current(pn_link_t *lin
return link->current;
}
-void pn_advance_sender(pn_sender_t *sender)
+void pn_advance_sender(pn_link_t *link)
{
- pn_link_t *link = &sender->link;
if (link->credit > 0) {
link->credit--;
pn_add_tpwork(link->current);
@@ -824,9 +900,8 @@ void pn_advance_sender(pn_sender_t *send
}
}
-void pn_advance_receiver(pn_receiver_t *receiver)
+void pn_advance_receiver(pn_link_t *link)
{
- pn_link_t *link = &receiver->link;
link->current = link->current->link_next;
}
@@ -835,9 +910,9 @@ bool pn_advance(pn_link_t *link)
if (link->current) {
pn_delivery_t *prev = link->current;
if (link->endpoint.type == SENDER) {
- pn_advance_sender((pn_sender_t *)link);
+ pn_advance_sender(link);
} else {
- pn_advance_receiver((pn_receiver_t *)link);
+ pn_advance_receiver(link);
}
pn_delivery_t *next = link->current;
pn_work_update(link->session->connection, prev);
@@ -876,13 +951,13 @@ void pn_settle(pn_delivery_t *delivery)
void pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...)
{
va_list ap;
- transport->endpoint.local_error.condition = condition;
+ transport->endpoint.error.condition = condition;
va_start(ap, fmt);
// XXX: result
- vsnprintf(transport->endpoint.local_error.description, DESCRIPTION, fmt, ap);
+ vsnprintf(transport->endpoint.error.description, DESCRIPTION, fmt, ap);
va_end(ap);
- transport->endpoint.local_state = CLOSED;
- fprintf(stderr, "ERROR %s %s\n", condition, transport->endpoint.local_error.description);
+ PN_SET_LOCAL(transport->endpoint.state, PN_LOCAL_CLOSED);
+ fprintf(stderr, "ERROR %s %s\n", condition, transport->endpoint.error.description);
// XXX: need to write close frame if appropriate
}
@@ -890,8 +965,7 @@ void pn_do_open(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
pn_connection_t *conn = transport->connection;
- // TODO: store the state
- conn->endpoint.remote_state = ACTIVE;
+ PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
}
void pn_do_begin(pn_dispatcher_t *disp)
@@ -904,10 +978,10 @@ void pn_do_begin(pn_dispatcher_t *disp)
state = &transport->sessions[pn_to_uint16(remote_channel)];
} else {
pn_session_t *ssn = pn_session(transport->connection);
- state = pn_session_state(transport, ssn);
+ state = pn_session_get_state(transport, ssn);
}
pn_map_channel(transport, disp->channel, state);
- state->session->endpoint.remote_state = ACTIVE;
+ PN_SET_REMOTE(state->session->endpoint.state, PN_REMOTE_ACTIVE);
}
pn_link_state_t *pn_find_link(pn_session_state_t *ssn_state, pn_string_t *name)
@@ -917,7 +991,7 @@ pn_link_state_t *pn_find_link(pn_session
pn_link_t *link = ssn_state->session->links[i];
if (!wcsncmp(pn_string_wcs(name), link->name, pn_string_size(name)))
{
- return pn_link_state(ssn_state, link);
+ return pn_link_get_state(ssn_state, link);
}
}
return NULL;
@@ -939,11 +1013,11 @@ void pn_do_attach(pn_dispatcher_t *disp)
} else {
link = (pn_link_t *) pn_receiver(ssn_state->session, pn_string_wcs(name));
}
- link_state = pn_link_state(ssn_state, link);
+ link_state = pn_link_get_state(ssn_state, link);
}
pn_map_handle(ssn_state, handle, link_state);
- link_state->link->endpoint.remote_state = ACTIVE;
+ PN_SET_REMOTE(link_state->link->endpoint.state, PN_REMOTE_ACTIVE);
pn_value_t remote_source = pn_list_get(args, ATTACH_SOURCE);
if (remote_source.type == TAG)
remote_source = pn_tag_value(pn_to_tag(remote_source));
@@ -972,7 +1046,7 @@ void pn_do_transfer(pn_dispatcher_t *dis
pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
pn_link_t *link = link_state->link;
pn_binary_t *tag = pn_to_binary(pn_list_get(args, TRANSFER_DELIVERY_TAG));
- pn_delivery_t *delivery = pn_delivery(link, tag);
+ pn_delivery_t *delivery = pn_delivery(link, pn_dtag(pn_binary_bytes(tag), pn_binary_size(tag)));
pn_delivery_state_t *state = pn_delivery_buffer_push(&ssn_state->incoming, delivery);
delivery->context = state;
// XXX: need to check that state is not null (i.e. we haven't hit the limit)
@@ -1080,7 +1154,7 @@ void pn_do_detach(pn_dispatcher_t *disp)
if (closed)
{
- link->endpoint.remote_state = CLOSED;
+ PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED);
} else {
// TODO: implement
}
@@ -1093,27 +1167,27 @@ void pn_do_end(pn_dispatcher_t *disp)
pn_session_t *session = ssn_state->session;
ssn_state->remote_channel = -1;
- session->endpoint.remote_state = CLOSED;
+ PN_SET_REMOTE(session->endpoint.state, PN_REMOTE_CLOSED);
}
void pn_do_close(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
- transport->connection->endpoint.remote_state = CLOSED;
- transport->endpoint.remote_state = CLOSED;
+ PN_SET_REMOTE(transport->connection->endpoint.state, PN_REMOTE_CLOSED);
+ PN_SET_REMOTE(transport->endpoint.state, PN_REMOTE_CLOSED);
}
ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available)
{
- if (transport->endpoint.local_state == UNINIT) {
+ if (transport->endpoint.state & PN_LOCAL_UNINIT) {
return 0;
}
- if (transport->endpoint.local_state == CLOSED) {
+ if (transport->endpoint.state & PN_LOCAL_CLOSED) {
return PN_EOS;
}
- if (transport->endpoint.remote_state == CLOSED) {
+ if (transport->endpoint.state & PN_REMOTE_CLOSED) {
pn_do_error(transport, "amqp:connection:framing-error", "data after close");
return PN_ERR;
}
@@ -1125,7 +1199,7 @@ void pn_process_conn_setup(pn_transport_
{
if (endpoint->type == CONNECTION)
{
- if (endpoint->local_state != UNINIT && !transport->open_sent)
+ if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent)
{
pn_connection_t *connection = (pn_connection_t *) endpoint;
pn_init_frame(transport->disp);
@@ -1144,8 +1218,8 @@ void pn_process_ssn_setup(pn_transport_t
if (endpoint->type == SESSION)
{
pn_session_t *ssn = (pn_session_t *) endpoint;
- pn_session_state_t *state = pn_session_state(transport, ssn);
- if (endpoint->local_state != UNINIT && state->local_channel == (uint16_t) -1)
+ pn_session_state_t *state = pn_session_get_state(transport, ssn);
+ if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1)
{
pn_init_frame(transport->disp);
if ((int16_t) state->remote_channel >= 0)
@@ -1167,9 +1241,9 @@ void pn_process_link_setup(pn_transport_
if (endpoint->type == SENDER || endpoint->type == RECEIVER)
{
pn_link_t *link = (pn_link_t *) endpoint;
- pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
- pn_link_state_t *state = pn_link_state(ssn_state, link);
- if (endpoint->local_state != UNINIT && state->local_handle == (uint32_t) -1)
+ pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
+ pn_link_state_t *state = pn_link_get_state(ssn_state, link);
+ if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == (uint32_t) -1)
{
pn_init_frame(transport->disp);
pn_field(transport->disp, ATTACH_ROLE, pn_boolean(endpoint->type == RECEIVER));
@@ -1192,12 +1266,12 @@ void pn_process_link_setup(pn_transport_
void pn_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
- if (endpoint->type == RECEIVER && endpoint->local_state == ACTIVE)
+ if (endpoint->type == RECEIVER && endpoint->state & PN_LOCAL_ACTIVE)
{
- pn_receiver_t *rcv = (pn_receiver_t *) endpoint;
+ pn_link_t *rcv = (pn_link_t *) endpoint;
if (rcv->credits) {
- pn_session_state_t *ssn_state = pn_session_state(transport, rcv->link.session);
- pn_link_state_t *state = pn_link_state(ssn_state, &rcv->link);
+ pn_session_state_t *ssn_state = pn_session_get_state(transport, rcv->session);
+ pn_link_state_t *state = pn_link_get_state(ssn_state, rcv);
state->link_credit += rcv->credits;
rcv->credits = 0;
@@ -1217,7 +1291,7 @@ void pn_process_flow_receiver(pn_transpo
void pn_post_disp(pn_transport_t *transport, pn_delivery_t *delivery)
{
pn_link_t *link = delivery->link;
- pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+ pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
// XXX: check for null state
pn_delivery_state_t *state = delivery->context;
pn_init_frame(transport->disp);
@@ -1255,7 +1329,7 @@ void pn_process_disp_receiver(pn_transpo
pn_link_t *link = delivery->link;
if (link->endpoint.type == RECEIVER) {
// XXX: need to prevent duplicate disposition sending
- pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+ pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
if ((int16_t) ssn_state->local_channel >= 0) {
pn_post_disp(transport, delivery);
}
@@ -1279,8 +1353,8 @@ void pn_process_msg_data(pn_transport_t
{
pn_link_t *link = delivery->link;
if (link->endpoint.type == SENDER) {
- pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
- pn_link_state_t *link_state = pn_link_state(ssn_state, link);
+ pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
+ pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
pn_delivery_state_t *state = delivery->context;
if (!state) {
state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
@@ -1316,7 +1390,7 @@ void pn_process_disp_sender(pn_transport
pn_link_t *link = delivery->link;
if (link->endpoint.type == SENDER) {
// XXX: need to prevent duplicate disposition sending
- pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
+ pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
/*if ((int16_t) ssn_state->local_channel >= 0) {
pn_post_disp(transport, delivery);
}*/
@@ -1341,9 +1415,9 @@ void pn_process_link_teardown(pn_transpo
{
pn_link_t *link = (pn_link_t *) endpoint;
pn_session_t *session = link->session;
- pn_session_state_t *ssn_state = pn_session_state(transport, session);
- pn_link_state_t *state = pn_link_state(ssn_state, link);
- if (endpoint->local_state == CLOSED && (int32_t) state->local_handle >= 0) {
+ pn_session_state_t *ssn_state = pn_session_get_state(transport, session);
+ pn_link_state_t *state = pn_link_get_state(ssn_state, link);
+ if (endpoint->state & PN_LOCAL_CLOSED && (int32_t) state->local_handle >= 0) {
pn_init_frame(transport->disp);
pn_field(transport->disp, DETACH_HANDLE, pn_value("I", state->local_handle));
pn_field(transport->disp, DETACH_CLOSED, pn_boolean(true));
@@ -1362,8 +1436,8 @@ void pn_process_ssn_teardown(pn_transpor
if (endpoint->type == SESSION)
{
pn_session_t *session = (pn_session_t *) endpoint;
- pn_session_state_t *state = pn_session_state(transport, session);
- if (endpoint->local_state == CLOSED && (int16_t) state->local_channel >= 0)
+ pn_session_state_t *state = pn_session_get_state(transport, session);
+ if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0)
{
pn_init_frame(transport->disp);
/*if (condition)
@@ -1379,7 +1453,7 @@ void pn_process_conn_teardown(pn_transpo
{
if (endpoint->type == CONNECTION)
{
- if (endpoint->local_state == CLOSED && !transport->close_sent) {
+ if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) {
pn_init_frame(transport->disp);
/*if (condition)
// XXX: symbol
@@ -1430,11 +1504,11 @@ void pn_process(pn_transport_t *transpor
ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
{
- if (transport->endpoint.local_state != UNINIT) {
+ if (!(transport->endpoint.state & PN_LOCAL_UNINIT)) {
pn_process(transport);
}
- if (!transport->disp->available && transport->endpoint.local_state == CLOSED) {
+ if (!transport->disp->available && transport->endpoint.state & PN_LOCAL_CLOSED) {
return PN_EOS;
}
@@ -1448,9 +1522,9 @@ void pn_trace(pn_transport_t *transport,
transport->disp->trace = trace;
}
-ssize_t pn_send(pn_sender_t *sender, const char *bytes, size_t n)
+ssize_t pn_send(pn_link_t *sender, const char *bytes, size_t n)
{
- pn_delivery_t *current = pn_current(&sender->link);
+ pn_delivery_t *current = pn_current(sender);
if (!current) return -1;
if (current->bytes) return 0;
PN_ENSURE(current->bytes, current->capacity, current->size + n);
@@ -1460,10 +1534,9 @@ ssize_t pn_send(pn_sender_t *sender, con
return n;
}
-ssize_t pn_recv(pn_receiver_t *receiver, char *bytes, size_t n)
+ssize_t pn_recv(pn_link_t *receiver, char *bytes, size_t n)
{
- pn_link_t *link = &receiver->link;
- pn_delivery_t *delivery = link->current;
+ pn_delivery_t *delivery = receiver->current;
if (delivery) {
if (delivery->size) {
size_t size = n > delivery->size ? delivery->size : n;
@@ -1480,10 +1553,10 @@ ssize_t pn_recv(pn_receiver_t *receiver,
}
}
-void pn_flow(pn_receiver_t *receiver, int credits)
+void pn_flow(pn_link_t *receiver, int credits)
{
receiver->credits += credits;
- pn_modified(receiver->link.session->connection, &receiver->link.endpoint);
+ pn_modified(receiver->session->connection, &receiver->endpoint);
}
time_t pn_tick(pn_transport_t *engine, time_t now)
Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Thu Mar 22 14:45:38 2012
@@ -116,54 +116,42 @@ void server_callback(pn_selectable_t *se
char msg[1024];
char data[1024];
- pn_endpoint_t *endpoint = pn_endpoint_head(conn, UNINIT, ACTIVE);
- while (endpoint)
- {
- switch (pn_endpoint_type(endpoint))
- {
- case CONNECTION:
- case SESSION:
- if (pn_remote_state(endpoint) != UNINIT)
- pn_open(endpoint);
- break;
- case SENDER:
- case RECEIVER:
- {
- pn_link_t *link = (pn_link_t *) endpoint;
- if (pn_remote_state(endpoint) != UNINIT) {
- printf("%ls, %ls\n", pn_remote_source(link), pn_remote_target(link));
- pn_set_source(link, pn_remote_source(link));
- pn_set_target(link, pn_remote_target(link));
- pn_open(endpoint);
- if (pn_endpoint_type(endpoint) == RECEIVER) {
- pn_flow((pn_receiver_t *) endpoint, 100);
- } else {
- pn_binary_t *tag = pn_binary("blah", 4);
- pn_delivery(link, tag);
- pn_free_binary(tag);
- }
- }
- }
- break;
- case TRANSPORT:
- break;
+ if (pn_connection_state(conn) == (PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)) {
+ pn_connection_open(conn);
+ }
+
+ pn_session_t *ssn = pn_session_head(conn, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE);
+ while (ssn) {
+ pn_session_open(ssn);
+ ssn = pn_session_next(ssn, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE);
+ }
+
+ pn_link_t *link = pn_link_head(conn, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE);
+ while (link) {
+ printf("%ls, %ls\n", pn_remote_source(link), pn_remote_target(link));
+ pn_set_source(link, pn_remote_source(link));
+ pn_set_target(link, pn_remote_target(link));
+ pn_link_open(link);
+ if (pn_is_receiver(link)) {
+ pn_flow(link, 100);
+ } else {
+ pn_delivery(link, pn_dtag("blah", 4));
}
- endpoint = pn_endpoint_next(endpoint, UNINIT, ACTIVE);
+ link = pn_link_next(link, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE);
}
pn_delivery_t *delivery = pn_work_head(conn);
while (delivery)
{
- pn_binary_t *tag = pn_delivery_tag(delivery);
- pn_format(tagstr, 1024, pn_from_binary(tag));
+ pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+ pn_quote_data(tagstr, 1024, tag.bytes, tag.size);
pn_link_t *link = pn_link(delivery);
if (pn_readable(delivery)) {
printf("received delivery: %s\n", tagstr);
- pn_receiver_t *receiver = (pn_receiver_t *) link;
printf(" payload = \"");
while (true) {
- ssize_t n = pn_recv(receiver, msg, 1024);
+ ssize_t n = pn_recv(link, msg, 1024);
if (n == PN_EOM) {
pn_advance(link);
pn_disposition(delivery, PN_ACCEPTED);
@@ -174,17 +162,14 @@ void server_callback(pn_selectable_t *se
}
printf("\"\n");
} else if (pn_writable(delivery)) {
- pn_sender_t *sender = (pn_sender_t *) link;
sprintf(msg, "message body for %s", tagstr);
size_t n = pn_message_data(data, 1024, msg, strlen(msg));
- pn_send(sender, data, n);
+ pn_send(link, data, n);
if (pn_advance(link)) {
printf("sent delivery: %s\n", tagstr);
char tagbuf[16];
sprintf(tagbuf, "%i", ctx->count++);
- pn_binary_t *tag = pn_binary(tagbuf, strlen(tagbuf));
- pn_delivery(link, tag);
- pn_free_binary(tag);
+ pn_delivery(link, pn_dtag(tagbuf, strlen(tagbuf)));
}
}
@@ -196,24 +181,20 @@ void server_callback(pn_selectable_t *se
delivery = pn_work_next(delivery);
}
- endpoint = pn_endpoint_head(conn, ACTIVE, CLOSED);
- while (endpoint)
- {
- switch (pn_endpoint_type(endpoint))
- {
- case CONNECTION:
- case SESSION:
- case SENDER:
- case RECEIVER:
- if (pn_remote_state(endpoint) == CLOSED) {
- pn_close(endpoint);
- }
- break;
- case TRANSPORT:
- break;
- }
+ if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+ pn_connection_close(conn);
+ }
+
+ ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ while (ssn) {
+ pn_session_close(ssn);
+ ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ }
- endpoint = pn_endpoint_next(endpoint, ACTIVE, CLOSED);
+ link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ while (link) {
+ pn_link_close(link);
+ link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
}
}
@@ -266,27 +247,25 @@ void client_callback(pn_selectable_t *se
pn_connection_set_hostname(connection, ctx->hostname);
pn_session_t *ssn = pn_session(connection);
- pn_open((pn_endpoint_t *) connection);
- pn_open((pn_endpoint_t *) ssn);
+ pn_connection_open(connection);
+ pn_session_open(ssn);
if (ctx->send_count) {
- pn_sender_t *snd = pn_sender(ssn, L"sender");
- pn_set_target((pn_link_t *) snd, ctx->address);
- pn_open((pn_endpoint_t *) snd);
+ pn_link_t *snd = pn_sender(ssn, L"sender");
+ pn_set_target(snd, ctx->address);
+ pn_link_open(snd);
char buf[16];
for (int i = 0; i < ctx->send_count; i++) {
sprintf(buf, "%c", 'a' + i);
- pn_binary_t *tag = pn_binary(buf, strlen(buf));
- pn_delivery((pn_link_t *) snd, tag);
- pn_free_binary(tag);
+ pn_delivery(snd, pn_dtag(buf, strlen(buf)));
}
}
if (ctx->recv_count) {
- pn_receiver_t *rcv = pn_receiver(ssn, L"receiver");
- pn_set_source((pn_link_t *) rcv, ctx->address);
- pn_open((pn_endpoint_t *) rcv);
+ pn_link_t *rcv = pn_receiver(ssn, L"receiver");
+ pn_set_source(rcv, ctx->address);
+ pn_link_open(rcv);
pn_flow(rcv, ctx->recv_count);
}
}
@@ -294,27 +273,25 @@ void client_callback(pn_selectable_t *se
pn_delivery_t *delivery = pn_work_head(connection);
while (delivery)
{
- pn_binary_t *tag = pn_delivery_tag(delivery);
- pn_format(tagstr, 1024, pn_from_binary(tag));
+ pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+ pn_quote_data(tagstr, 1024, tag.bytes, tag.size);
pn_link_t *link = pn_link(delivery);
if (pn_writable(delivery)) {
- pn_sender_t *snd = (pn_sender_t *) link;
sprintf(msg, "message body for %s", tagstr);
ssize_t n = pn_message_data(data, 1024, msg, strlen(msg));
- pn_send(snd, data, n);
+ pn_send(link, data, n);
if (pn_advance(link)) printf("sent delivery: %s\n", tagstr);
} else if (pn_readable(delivery)) {
printf("received delivery: %s\n", tagstr);
- pn_receiver_t *rcv = (pn_receiver_t *) link;
printf(" payload = \"");
while (true) {
- size_t n = pn_recv(rcv, msg, 1024);
+ size_t n = pn_recv(link, msg, 1024);
if (n == PN_EOM) {
pn_advance(link);
pn_disposition(delivery, PN_ACCEPTED);
pn_settle(delivery);
if (!--ctx->recv_count) {
- pn_close((pn_endpoint_t *)link);
+ pn_link_close(link);
}
break;
} else {
@@ -329,7 +306,7 @@ void client_callback(pn_selectable_t *se
pn_clean(delivery);
pn_settle(delivery);
if (!--ctx->send_count) {
- pn_close((pn_endpoint_t *)link);
+ pn_link_close(link);
}
}
@@ -340,23 +317,11 @@ void client_callback(pn_selectable_t *se
printf("closing\n");
// XXX: how do we close the session?
//pn_close((pn_endpoint_t *) ssn);
- pn_close((pn_endpoint_t *)connection);
+ pn_connection_close(connection);
}
- pn_endpoint_t *endpoint = pn_endpoint_head(connection, CLOSED, CLOSED);
- while (endpoint)
- {
- switch (pn_endpoint_type(endpoint)) {
- case CONNECTION:
- pn_driver_stop(ctx->driver);
- break;
- case SESSION:
- case SENDER:
- case RECEIVER:
- case TRANSPORT:
- break;
- }
- endpoint = pn_endpoint_next(endpoint, CLOSED, CLOSED);
+ if (pn_connection_state(connection) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
+ pn_driver_stop(ctx->driver);
}
}
Modified: qpid/proton/trunk/proton-c/src/types/binary.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/types/binary.c?rev=1303812&r1=1303811&r2=1303812&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/types/binary.c (original)
+++ qpid/proton/trunk/proton-c/src/types/binary.c Thu Mar 22 14:45:38 2012
@@ -26,7 +26,7 @@
#include "value-internal.h"
#include "../util.h"
-pn_binary_t *pn_binary(char *bytes, size_t size)
+pn_binary_t *pn_binary(const char *bytes, size_t size)
{
pn_binary_t *bin = malloc(sizeof(pn_binary_t) + size);
bin->size = size;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org