You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/28 00:47:17 UTC
[1/4] qpid-proton git commit: NO-JIRA: Removed out-of-date FIXME
comment
Repository: qpid-proton
Updated Branches:
refs/heads/master 13e40e9e6 -> b173c3a81
NO-JIRA: Removed out-of-date FIXME comment
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0c6563fe
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0c6563fe
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0c6563fe
Branch: refs/heads/master
Commit: 0c6563fe0d63365ee70590654490636910a9d7b4
Parents: 4417e35
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 17:09:21 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 19:45:13 2017 -0500
----------------------------------------------------------------------
proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c6563fe/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
index 133faad..2576046 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
@@ -30,7 +30,6 @@ import (
func TestLinkSettings(t *testing.T) {
cConn, sConn := net.Pipe()
done := make(chan error)
- // FIXME aconway 2017-02-23: bug in timeout conversion (pn_second_t)
settings := TerminusSettings{Durability: 1, Expiry: 2, Timeout: 42 * time.Second, Dynamic: true}
go func() { // Server
close(done)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/4] qpid-proton git commit: PROTON-1418: pn_vlogf missing newline
after log.
Posted by ac...@apache.org.
PROTON-1418: pn_vlogf missing newline after log.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4417e356
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4417e356
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4417e356
Branch: refs/heads/master
Commit: 4417e3566b80cdde992b0a32491892a4c736f2bd
Parents: 0e81224
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 16:22:20 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 19:45:13 2017 -0500
----------------------------------------------------------------------
proton-c/src/core/log.c | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4417e356/proton-c/src/core/log.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/log.c b/proton-c/src/core/log.c
index 60c72e7..db83e7e 100644
--- a/proton-c/src/core/log.c
+++ b/proton-c/src/core/log.c
@@ -50,6 +50,7 @@ void pn_log_logger(pn_logger_t new_logger) {
void pn_vlogf_impl(const char *fmt, va_list ap) {
vfprintf(stderr, fmt, ap);
+ fprintf(stderr, "\n");
}
/**@internal
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/4] qpid-proton git commit: NO-JIRA: Fixed bogus cmake test for
existence of proactor in examples
Posted by ac...@apache.org.
NO-JIRA: Fixed bogus cmake test for existence of proactor in examples
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0e81224d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0e81224d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0e81224d
Branch: refs/heads/master
Commit: 0e81224d3112c018e11ad3e4bc66eca016ef5656
Parents: 13e40e9
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 19:15:53 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 19:45:13 2017 -0500
----------------------------------------------------------------------
examples/c/proactor/CMakeLists.txt | 5 -----
proton-c/CMakeLists.txt | 1 +
2 files changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e81224d/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
index bb6cb0f..2cb7ad9 100644
--- a/examples/c/proactor/CMakeLists.txt
+++ b/examples/c/proactor/CMakeLists.txt
@@ -21,11 +21,6 @@ find_package(Proton REQUIRED)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
-# Check if the proton library has a proactor implementation.
-include(CheckFunctionExists)
-set(CMAKE_REQUIRED_LIBRARIES ${Proton_LIBRARIES})
-check_function_exists(pn_proactor HAS_PROACTOR)
-
if(HAS_PROACTOR)
add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e81224d/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 60e739d..c56c03f 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -578,6 +578,7 @@ endif(MSVC)
if (qpid-proton-proactor)
set(HAS_PROACTOR 1)
+ set(HAS_PROACTOR 1 PARENT_SCOPE) # Visible to examples
add_library (
qpid-proton-proactor SHARED
${qpid-proton-proactor}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/4] qpid-proton git commit: PROTON-1421: c proactor to take a
connection URL string
Posted by ac...@apache.org.
PROTON-1421: c proactor to take a connection URL string
Instead of host, port take a single string in "host:port" or URL format for
pn_proactor_connect and pn_proactor_listen
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b173c3a8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b173c3a8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b173c3a8
Branch: refs/heads/master
Commit: b173c3a811a36c1dd48d492b8cb6f8b9579ac417
Parents: 0c6563f
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 19:32:01 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 19:45:19 2017 -0500
----------------------------------------------------------------------
examples/c/proactor/broker.c | 121 +++++-------------------
examples/c/proactor/direct.c | 156 ++++++++++++-------------------
examples/c/proactor/receive.c | 95 +++++++------------
examples/c/proactor/send.c | 160 ++++++++++++--------------------
examples/c/proactor/test.py | 44 ++++-----
proton-c/include/proton/proactor.h | 17 ++--
proton-c/src/proactor/libuv.c | 30 +++---
proton-c/src/tests/proactor.c | 28 +++---
proton-c/src/tests/test_tools.h | 8 +-
9 files changed, 237 insertions(+), 422 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index 370d2e8..302bdec 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -19,45 +19,16 @@
#include "thread.h"
-#include <proton/connection_driver.h>
-#include <proton/proactor.h>
#include <proton/engine.h>
#include <proton/listener.h>
+#include <proton/proactor.h>
#include <proton/sasl.h>
#include <proton/transport.h>
-#include <proton/url.h>
-#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-bool enable_debug = false;
-
-void debug(const char* fmt, ...) {
- if (enable_debug) {
- va_list(ap);
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- fputc('\n', stderr);
- fflush(stderr);
- }
-}
-
-void check(int err, const char* s) {
- if (err != 0) {
- perror(s);
- exit(1);
- }
-}
-
-void pcheck(int err, const char* s) {
- if (err != 0) {
- fprintf(stderr, "%s: %s", s, pn_code(err));
- exit(1);
- }
-}
-
/* Simple re-sizable vector that acts as a queue */
#define VEC(T) struct { T* data; size_t len, cap; }
@@ -98,7 +69,6 @@ typedef struct queue_t {
} queue_t;
static void queue_init(queue_t *q, const char* name, queue_t *next) {
- debug("created queue %s", name);
pthread_mutex_init(&q->lock, NULL);
q->name = strdup(name);
VEC_INIT(q->messages);
@@ -126,7 +96,6 @@ static void queue_send(queue_t *q, pn_link_t *s) {
size_t tag = 0;
pthread_mutex_lock(&q->lock);
if (q->messages.len == 0) { /* Empty, record connection as waiting */
- debug("queue is empty %s", q->name);
/* Record connection for wake-up if not already on the list. */
pn_connection_t *c = pn_session_connection(pn_link_session(s));
size_t i = 0;
@@ -136,7 +105,6 @@ static void queue_send(queue_t *q, pn_link_t *s) {
VEC_PUSH(q->waiting, c);
}
} else {
- debug("sending from queue %s", q->name);
m = q->messages.data[0];
VEC_POP(q->messages);
tag = ++q->sent;
@@ -169,7 +137,6 @@ bool pn_connection_get_check_queues(pn_connection_t *c) {
If the queue was previously empty, notify waiting senders.
*/
static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
- debug("received to queue %s", q->name);
pthread_mutex_lock(&q->lock);
VEC_PUSH(q->messages, m);
if (q->messages.len == 1) { /* Was empty, notify waiting connections */
@@ -221,22 +188,12 @@ queue_t* queues_get(queues_t *qs, const char* name) {
/* The broker implementation */
typedef struct broker_t {
pn_proactor_t *proactor;
- queues_t queues;
- const char *container_id; /* AMQP container-id */
size_t threads;
- pn_millis_t heartbeat;
+ const char *container_id; /* AMQP container-id */
+ queues_t queues;
bool finished;
} broker_t;
-void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
- memset(b, 0, sizeof(*b));
- b->proactor = pn_proactor();
- queues_init(&b->queues);
- b->container_id = container_id;
- b->threads = threads;
- b->heartbeat = 0;
-}
-
void broker_stop(broker_t *b) {
/* In this broker an interrupt stops a thread, stopping all threads stops the broker */
for (size_t i = 0; i < b->threads; ++i)
@@ -293,10 +250,10 @@ static int exit_code = 0;
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
- exit_code = 1;
- const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN";
- fprintf(stderr, "%s: %s: %s\n", ename,
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
pn_condition_get_name(cond), pn_condition_get_description(cond));
+ pn_connection_close(pn_event_connection(e));
+ exit_code = 1;
}
}
@@ -307,6 +264,11 @@ static void handle(broker_t* b, pn_event_t* e) {
switch (pn_event_type(e)) {
+ case PN_LISTENER_OPEN:
+ printf("listening\n");
+ fflush(stdout);
+ break;
+
case PN_LISTENER_ACCEPT:
pn_listener_accept(pn_event_listener(e), pn_connection());
break;
@@ -320,7 +282,6 @@ static void handle(broker_t* b, pn_event_t* e) {
pn_transport_t *t = pn_connection_transport(c);
pn_transport_require_auth(t, false);
pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
- pn_transport_set_idle_timeout(t, 2 * b->heartbeat);
}
case PN_CONNECTION_REMOTE_OPEN: {
pn_connection_open(pn_event_connection(e)); /* Complete the open */
@@ -431,62 +392,26 @@ static void* broker_thread(void *void_broker) {
return NULL;
}
-static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-d] [-a url] [-t thread-count]\n", arg0);
- exit(1);
-}
-
int main(int argc, char **argv) {
- /* Command line options */
- char *urlstr = NULL;
- char container_id[256];
- /* Note container-id should be unique */
- snprintf(container_id, sizeof(container_id), "%s", argv[0]);
- size_t nthreads = 4;
- pn_millis_t heartbeat = 0;
- int opt;
- while ((opt = getopt(argc, argv, "a:t:dh:c:")) != -1) {
- switch (opt) {
- case 'a': urlstr = optarg; break;
- case 't': nthreads = atoi(optarg); break;
- case 'd': enable_debug = true; break;
- case 'h': heartbeat = atoi(optarg); break;
- case 'c': strncpy(container_id, optarg, sizeof(container_id)); break;
- default: usage(argv[0]); break;
- }
- }
- if (optind < argc)
- usage(argv[0]);
-
- broker_t b;
- broker_init(&b, container_id, nthreads, heartbeat);
-
- /* Parse the URL or use default values */
- const char *host = "0.0.0.0";
- const char *port = "amqp";
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- if (url) {
- if (pn_url_get_host(url)) host = pn_url_get_host(url);
- if (pn_url_get_port(url)) port = (pn_url_get_port(url));
- }
+ broker_t b = {0};
+ b.proactor = pn_proactor();
+ queues_init(&b.queues);
+ b.container_id = argv[0];
+ b.threads = 4;
+ const char *addr = (argc > 1) ? argv[1] : "127.0.0.1:amqp";
- pn_proactor_listen(b.proactor, pn_listener(), host, port, 16);
- printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
- fflush(stdout);
+ /* Listen on addr */
+ pn_proactor_listen(b.proactor, pn_listener(), addr, 16);
- if (url) pn_url_free(url);
- if (b.threads <= 0) {
- fprintf(stderr, "invalid value -t %zu, threads must be > 0\n", b.threads);
- exit(1);
- }
- /* Start n-1 threads and use main thread */
+ /* Start n-1 threads */
pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
for (size_t i = 0; i < b.threads-1; ++i) {
- check(pthread_create(&threads[i], NULL, broker_thread, &b), "pthread_create");
+ pthread_create(&threads[i], NULL, broker_thread, &b);
}
broker_thread(&b); /* Use the main thread too. */
+ /* Join the other threads */
for (size_t i = 0; i < b.threads-1; ++i) {
- check(pthread_join(threads[i], NULL), "pthread_join");
+ pthread_join(threads[i], NULL);
}
pn_proactor_free(b.proactor);
free(threads);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index 126ae4f..f76895c 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -19,8 +19,8 @@
*
*/
+#include <proton/condition.h>
#include <proton/connection.h>
-#include <proton/connection_driver.h>
#include <proton/delivery.h>
#include <proton/link.h>
#include <proton/listener.h>
@@ -29,30 +29,24 @@
#include <proton/sasl.h>
#include <proton/session.h>
#include <proton/transport.h>
-#include <proton/url.h>
-#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
-#include <string.h>
-
-typedef char str[1024];
typedef struct app_data_t {
- /* Common values */
+ const char *connection_address;
+ const char *amqp_address;
+ const char *container_id;
+ int message_count;
+
pn_proactor_t *proactor;
- bool finished;
- str address;
- str container_id;
+ pn_listener_t *listener;
pn_rwbytes_t message_buffer;
- int message_count;
/* Sender values */
int sent;
int acknowledged;
pn_link_t *sender;
- pn_millis_t delay;
- bool delaying;
/* Receiver values */
int received;
@@ -64,9 +58,10 @@ static int exit_code = 0;
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
- exit_code = 1;
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
pn_condition_get_name(cond), pn_condition_get_description(cond));
+ pn_connection_close(pn_event_connection(e));
+ exit_code = 1;
}
}
@@ -104,23 +99,6 @@ static pn_bytes_t encode_message(app_data_t* app) {
return pn_bytes(mbuf.size, mbuf.start);
}
-static void send(app_data_t* app) {
- while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) {
- ++app->sent;
- // Use sent counter bytes as unique delivery tag.
- pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
- pn_bytes_t msgbuf = encode_message(app);
- pn_link_send(app->sender, msgbuf.start, msgbuf.size);
- pn_link_advance(app->sender);
- if (app->delay && app->sent < app->message_count) {
- /* If delay is set, wait for TIMEOUT event to send more */
- app->delaying = true;
- pn_proactor_set_timeout(app->proactor, app->delay);
- break;
- }
- }
-}
-
#define MAX_SIZE 1024
static void decode_message(pn_delivery_t *dlv) {
@@ -195,16 +173,23 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
case PN_LINK_REMOTE_OPEN: {
pn_link_t* l = pn_event_link(event);
- pn_terminus_set_address(pn_link_target(l), app->address);
+ pn_terminus_set_address(pn_link_target(l), app->amqp_address);
pn_link_open(l);
} break;
- case PN_LINK_FLOW:
- /* The peer has given us some credit, now we can send messages */
- if (!app->delaying) {
- app->sender = pn_event_link(event);
- send(app);
- } break;
+ case PN_LINK_FLOW: {
+ /* The peer has given us some credit, now we can send messages */
+ pn_link_t *sender = pn_event_link(event);
+ while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+ ++app->sent;
+ // Use sent counter as unique delivery tag.
+ pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+ pn_bytes_t msgbuf = encode_message(app);
+ pn_link_send(sender, msgbuf.start, msgbuf.size);
+ pn_link_advance(sender);
+ }
+ break;
+ }
case PN_DELIVERY: {
/* We received acknowledgedment from the peer that a message was delivered. */
@@ -213,6 +198,7 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
if (++app->acknowledged == app->message_count) {
printf("%d messages sent and acknowledged\n", app->acknowledged);
pn_connection_close(pn_event_connection(event));
+ /* Continue handling events till we receive TRANSPORT_CLOSED */
}
}
} break;
@@ -222,10 +208,17 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
}
}
-/* Handle all events, delegate to handle_send or handle_receive depending on link mode */
-static void handle(app_data_t* app, pn_event_t* event) {
+/* Handle all events, delegate to handle_send or handle_receive depending on link mode.
+ Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
+ case PN_LISTENER_OPEN:
+ printf("listening\n");
+ fflush(stdout);
+ break;
+
case PN_LISTENER_ACCEPT:
pn_listener_accept(pn_event_listener(event), pn_connection());
break;
@@ -252,7 +245,7 @@ static void handle(app_data_t* app, pn_event_t* event) {
case PN_TRANSPORT_CLOSED:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
- app->finished = true;
+ pn_listener_close(app->listener); /* Finished */
break;
case PN_CONNECTION_REMOTE_CLOSE:
@@ -276,19 +269,12 @@ static void handle(app_data_t* app, pn_event_t* event) {
pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
break;
- case PN_CONNECTION_WAKE:
- /* Timeout, we can send more. */
- app->delaying = false;
- send(app);
+ case PN_LISTENER_CLOSE:
+ check_condition(event, pn_listener_condition(pn_event_listener(event)));
break;
case PN_PROACTOR_INACTIVE:
- app->finished = true;
- break;
-
- case PN_LISTENER_CLOSE:
- check_condition(event, pn_listener_condition(pn_event_listener(event)));
- app->finished = true;
+ return false;
break;
default: {
@@ -302,60 +288,34 @@ static void handle(app_data_t* app, pn_event_t* event) {
}
}
}
+ return true;
}
-static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-a URL] [-m message-count] [-d delay-ms]\n", arg0);
- fprintf(stderr, "Demonstrates direct peer-to-peer AMQP communication without a broker. Accepts a connection from either the send.c or receive.c client and provides the complementary behavior (receive or send.");
- exit(1);
+void run(app_data_t *app) {
+ /* Loop and handle events */
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+ for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+ if (!handle(app, e)) {
+ return;
+ }
+ }
+ pn_proactor_done(app->proactor, events);
+ } while(true);
}
int main(int argc, char **argv) {
- /* Default values for application and connection. */
- app_data_t app = {0};
- app.message_count = 100;
- const char* urlstr = NULL;
-
- int opt;
- while((opt = getopt(argc, argv, "a:m:d:")) != -1) {
- switch(opt) {
- case 'a': urlstr = optarg; break;
- case 'm': app.message_count = atoi(optarg); break;
- case 'd': app.delay = atoi(optarg); break;
- default: usage(argv[0]); break;
- }
- }
- if (optind < argc)
- usage(argv[0]);
- /* Note container-id should be unique */
- snprintf(app.container_id, sizeof(app.container_id), "%s", argv[0]);
-
- /* Parse the URL or use default values */
- const char *host = "0.0.0.0";
- const char *port = "amqp";
- strncpy(app.address, "example", sizeof(app.address));
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- if (url) {
- if (pn_url_get_host(url)) host = pn_url_get_host(url);
- if (pn_url_get_port(url)) port = (pn_url_get_port(url));
- if (pn_url_get_path(url)) strncpy(app.address, pn_url_get_path(url), sizeof(app.address));
- }
+ struct app_data_t app = {0};
+ app.container_id = argv[0]; /* Should be unique */
+ app.connection_address = (argc > 1) ? argv[1] : "127.0.0.1:amqp";
+ app.amqp_address = (argc > 2) ? argv[2] : "example";
+ app.message_count = (argc > 3) ? atoi(argv[3]) : 10;
+ /* Create the proactor and connect */
app.proactor = pn_proactor();
- pn_proactor_listen(app.proactor, pn_listener(), host, port, 16);
- printf("listening on '%s:%s'\n", host, port);
- fflush(stdout);
- if (url) pn_url_free(url);
-
- do {
- pn_event_batch_t *events = pn_proactor_wait(app.proactor);
- pn_event_t *e;
- while ((e = pn_event_batch_next(events))) {
- handle(&app, e);
- }
- pn_proactor_done(app.proactor, events);
- } while(!app.finished);
-
+ app.listener = pn_listener();
+ pn_proactor_listen(app.proactor, app.listener, app.connection_address, 16);
+ run(&app);
pn_proactor_free(app.proactor);
free(app.message_buffer.start);
return exit_code;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index 65ec069..43a68cd 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -20,29 +20,25 @@
*/
#include <proton/connection.h>
-#include <proton/connection_driver.h>
+#include <proton/condition.h>
#include <proton/delivery.h>
-#include <proton/proactor.h>
#include <proton/link.h>
#include <proton/message.h>
+#include <proton/proactor.h>
#include <proton/session.h>
#include <proton/transport.h>
-#include <proton/url.h>
-#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
-#include <string.h>
-
-typedef char str[1024];
typedef struct app_data_t {
- str address;
- str container_id;
- pn_rwbytes_t message_buffer;
+ const char *connection_address;
+ const char *amqp_address;
+ const char *container_id;
int message_count;
- int received;
+
pn_proactor_t *proactor;
+ int received;
bool finished;
} app_data_t;
@@ -52,9 +48,10 @@ static int exit_code = 0;
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
- exit_code = 1;
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
pn_condition_get_name(cond), pn_condition_get_description(cond));
+ pn_connection_close(pn_event_connection(e));
+ exit_code = 1;
}
}
@@ -81,7 +78,8 @@ static void decode_message(pn_delivery_t *dlv) {
}
}
-static void handle(app_data_t* app, pn_event_t* event) {
+/* Return true to continue, false to exit */
+static bool handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT: {
@@ -91,7 +89,7 @@ static void handle(app_data_t* app, pn_event_t* event) {
pn_session_t* s = pn_session(c);
pn_session_open(s);
pn_link_t* l = pn_receiver(s, "my_receiver");
- pn_terminus_set_address(pn_link_source(l), app->address);
+ pn_terminus_set_address(pn_link_source(l), app->amqp_address);
pn_link_open(l);
/* cannot receive without granting credit: */
pn_link_flow(l, app->message_count ? app->message_count : BATCH);
@@ -148,63 +146,38 @@ static void handle(app_data_t* app, pn_event_t* event) {
break;
case PN_PROACTOR_INACTIVE:
- app->finished = true;
+ return false;
break;
- default: break;
+ default:
+ break;
}
+ return true;
}
-static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
- exit(1);
+void run(app_data_t *app) {
+ /* Loop and handle events */
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+ for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+ if (!handle(app, e)) {
+ return;
+ }
+ }
+ pn_proactor_done(app->proactor, events);
+ } while(true);
}
int main(int argc, char **argv) {
- /* Default values for application and connection. */
- app_data_t app = {{0}};
- app.message_count = 100;
- const char* urlstr = NULL;
-
- int opt;
- while((opt = getopt(argc, argv, "a:m:")) != -1) {
- switch(opt) {
- case 'a': urlstr = optarg; break;
- case 'm': app.message_count = atoi(optarg); break;
- default: usage(argv[0]); break;
- }
- }
- if (optind < argc)
- usage(argv[0]);
- /* Note container-id should be unique */
- snprintf(app.container_id, sizeof(app.container_id), "%s", argv[0]);
-
- /* Parse the URL or use default values */
- const char *host = "127.0.0.1";
- const char *port = "amqp";
- strncpy(app.address, "example", sizeof(app.address));
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- if (url) {
- if (pn_url_get_host(url)) host = pn_url_get_host(url);
- if (pn_url_get_port(url)) port = (pn_url_get_port(url));
- if (pn_url_get_path(url)) strncpy(app.address, pn_url_get_path(url), sizeof(app.address));
- }
+ struct app_data_t app = {0};
+ app.container_id = argv[0]; /* Should be unique */
+ app.connection_address = (argc > 1) ? argv[1] : "127.0.0.1:amqp";
+ app.amqp_address = (argc > 2) ? argv[2] : "example";
+ app.message_count = (argc > 3) ? atoi(argv[3]) : 10;
/* Create the proactor and connect */
app.proactor = pn_proactor();
- pn_proactor_connect(app.proactor, pn_connection(), host, port);
- if (url) pn_url_free(url);
-
- do {
- pn_event_batch_t *events = pn_proactor_wait(app.proactor);
- pn_event_t *e;
- while ((e = pn_event_batch_next(events))) {
- handle(&app, e);
- }
- pn_proactor_done(app.proactor, events);
- } while(!app.finished);
-
+ pn_proactor_connect(app.proactor, pn_connection(), app.connection_address);
+ run(&app);
pn_proactor_free(app.proactor);
- free(app.message_buffer.start);
- return exit_code;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 0786f19..c21ac68 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -20,43 +20,37 @@
*/
#include <proton/connection.h>
-#include <proton/connection_driver.h>
+#include <proton/condition.h>
#include <proton/delivery.h>
-#include <proton/proactor.h>
#include <proton/link.h>
#include <proton/message.h>
+#include <proton/proactor.h>
#include <proton/session.h>
#include <proton/transport.h>
-#include <proton/url.h>
-#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
-#include <string.h>
-
-typedef char str[1024];
typedef struct app_data_t {
- str address;
- str container_id;
- pn_rwbytes_t message_buffer;
+ const char *connection_address;
+ const char *amqp_address;
+ const char *container_id;
int message_count;
+
+ pn_proactor_t *proactor;
+ pn_rwbytes_t message_buffer;
int sent;
int acknowledged;
- pn_proactor_t *proactor;
- pn_millis_t delay;
- bool delaying;
- pn_link_t *sender;
- bool finished;
} app_data_t;
static int exit_code = 0;
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
- exit_code = 1;
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
pn_condition_get_name(cond), pn_condition_get_description(cond));
+ pn_connection_close(pn_event_connection(e));
+ exit_code = 1;
}
}
@@ -94,55 +88,35 @@ static pn_bytes_t encode_message(app_data_t* app) {
return pn_bytes(mbuf.size, mbuf.start);
}
-static void send(app_data_t* app) {
- while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) {
- ++app->sent;
- // Use sent counter bytes as unique delivery tag.
- pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
- pn_bytes_t msgbuf = encode_message(app);
- pn_link_send(app->sender, msgbuf.start, msgbuf.size);
- pn_link_advance(app->sender);
- if (app->delay && app->sent < app->message_count) {
- /* If delay is set, wait for TIMEOUT event to send more */
- app->delaying = true;
- pn_proactor_set_timeout(app->proactor, app->delay);
- break;
- }
- }
-}
-
-static void handle(app_data_t* app, pn_event_t* event) {
+/* Returns true to continue, false if finished */
+static bool handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT: {
pn_connection_t* c = pn_event_connection(event);
pn_connection_set_container(c, app->container_id);
pn_connection_open(c);
- pn_session_t* s = pn_session(c);
+ pn_session_t* s = pn_session(pn_event_connection(event));
pn_session_open(s);
pn_link_t* l = pn_sender(s, "my_sender");
- pn_terminus_set_address(pn_link_target(l), app->address);
+ pn_terminus_set_address(pn_link_target(l), app->amqp_address);
pn_link_open(l);
- } break;
-
- case PN_LINK_FLOW:
- /* The peer has given us some credit, now we can send messages */
- if (!app->delaying) {
- app->sender = pn_event_link(event);
- send(app);
- }
- break;
-
- case PN_PROACTOR_TIMEOUT:
- /* Wake the sender's connection */
- pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
- break;
-
- case PN_CONNECTION_WAKE:
- /* Timeout, we can send more. */
- app->delaying = false;
- send(app);
- break;
+ break;
+ }
+
+ case PN_LINK_FLOW: {
+ /* The peer has given us some credit, now we can send messages */
+ pn_link_t *sender = pn_event_link(event);
+ while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+ ++app->sent;
+ // Use sent counter as unique delivery tag.
+ pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+ pn_bytes_t msgbuf = encode_message(app);
+ pn_link_send(sender, msgbuf.start, msgbuf.size);
+ pn_link_advance(sender);
+ }
+ break;
+ }
case PN_DELIVERY: {
/* We received acknowledgedment from the peer that a message was delivered. */
@@ -151,9 +125,15 @@ static void handle(app_data_t* app, pn_event_t* event) {
if (++app->acknowledged == app->message_count) {
printf("%d messages sent and acknowledged\n", app->acknowledged);
pn_connection_close(pn_event_connection(event));
+ /* Continue handling events till we receive TRANSPORT_CLOSED */
}
+ } else {
+ fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d));
+ pn_connection_close(pn_event_connection(event));
+ exit_code=1;
}
- } break;
+ break;
+ }
case PN_TRANSPORT_CLOSED:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
@@ -176,63 +156,37 @@ static void handle(app_data_t* app, pn_event_t* event) {
break;
case PN_PROACTOR_INACTIVE:
- app->finished = true;
- break;
+ return false;
default: break;
}
+ return true;
}
-static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-a url] [-m message-count] [-d delay-ms]\n", arg0);
- exit(1);
+void run(app_data_t *app) {
+ /* Loop and handle events */
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+ for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+ if (!handle(app, e)) {
+ return;
+ }
+ }
+ pn_proactor_done(app->proactor, events);
+ } while(true);
}
int main(int argc, char **argv) {
- /* Default values for application and connection. */
- app_data_t app = {{0}};
- app.message_count = 100;
- const char* urlstr = NULL;
-
- int opt;
- while((opt = getopt(argc, argv, "a:m:d:")) != -1) {
- switch(opt) {
- case 'a': urlstr = optarg; break;
- case 'm': app.message_count = atoi(optarg); break;
- case 'd': app.delay = atoi(optarg); break;
- default: usage(argv[0]); break;
- }
- }
- if (optind < argc)
- usage(argv[0]);
- /* Note container-id should be unique */
- snprintf(app.container_id, sizeof(app.container_id), "%s", argv[0]);
-
- /* Parse the URL or use default values */
- const char *host = "127.0.0.1";
- const char *port = "amqp";
- strncpy(app.address, "example", sizeof(app.address));
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- if (url) {
- if (pn_url_get_host(url)) host = pn_url_get_host(url);
- if (pn_url_get_port(url)) port = (pn_url_get_port(url));
- if (pn_url_get_path(url)) strncpy(app.address, pn_url_get_path(url), sizeof(app.address));
- }
+ struct app_data_t app = {0};
+ app.container_id = argv[0]; /* Should be unique */
+ app.connection_address = (argc > 1) ? argv[1] : "127.0.0.1:amqp";
+ app.amqp_address = (argc > 2) ? argv[2] : "example";
+ app.message_count = (argc > 3) ? atoi(argv[3]) : 10;
/* Create the proactor and connect */
app.proactor = pn_proactor();
- pn_proactor_connect(app.proactor, pn_connection(), host, port);
- if (url) pn_url_free(url);
-
- do {
- pn_event_batch_t *events = pn_proactor_wait(app.proactor);
- pn_event_t *e;
- while ((e = pn_event_batch_next(events))) {
- handle(&app, e);
- }
- pn_proactor_done(app.proactor, events);
- } while(!app.finished);
-
+ pn_proactor_connect(app.proactor, pn_connection(), app.connection_address);
+ run(&app);
pn_proactor_free(app.proactor);
free(app.message_buffer.start);
return exit_code;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index 692e4be..84c519e 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -36,7 +36,7 @@ class Broker(object):
def __enter__(self):
with bind0() as sock:
self.addr = "127.0.0.1:%s/examples" % (sock.port())
- self.proc = self.test.proc(["broker", "-a", self.addr])
+ self.proc = self.test.proc(["broker", self.addr])
self.proc.wait_re("listening")
return self
@@ -52,44 +52,36 @@ class CExampleTest(ExampleTestCase):
def test_send_receive(self):
"""Send first then receive"""
with Broker(self) as b:
- s = self.proc(["send", "-a", b.addr])
- self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
- r = self.proc(["receive", "-a", b.addr])
- self.assertEqual(receive_expect(100), r.wait_out())
+ s = self.proc(["send", b.addr])
+ self.assertEqual("10 messages sent and acknowledged\n", s.wait_out())
+ r = self.proc(["receive", b.addr])
+ self.assertEqual(receive_expect(10), r.wait_out())
def test_receive_send(self):
"""Start receiving first, then send."""
with Broker(self) as b:
- r = self.proc(["receive", "-a", b.addr]);
- s = self.proc(["send", "-a", b.addr]);
- self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
- self.assertEqual(receive_expect(100), r.wait_out())
-
- def test_timed_send(self):
- """Send with timed delay"""
- with Broker(self) as b:
- s = self.proc(["send", "-a", b.addr, "-d100", "-m3"])
- self.assertEqual("3 messages sent and acknowledged\n", s.wait_out())
- r = self.proc(["receive", "-a", b.addr, "-m3"])
- self.assertEqual(receive_expect(3), r.wait_out())
+ r = self.proc(["receive", b.addr]);
+ s = self.proc(["send", b.addr]);
+ self.assertEqual("10 messages sent and acknowledged\n", s.wait_out())
+ self.assertEqual(receive_expect(10), r.wait_out())
def test_send_direct(self):
"""Send to direct server"""
with bind0() as sock:
- addr = "127.0.0.1:%s/examples" % sock.port()
- d = self.proc(["direct", "-a", addr])
+ addr = "127.0.0.1:%s" % sock.port()
+ d = self.proc(["direct", addr])
d.wait_re("listening")
- self.assertEqual("100 messages sent and acknowledged\n", self.proc(["send", "-a", addr]).wait_out())
- self.assertIn(receive_expect(100), d.wait_out())
+ self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", addr]).wait_out())
+ self.assertIn(receive_expect(10), d.wait_out())
def test_receive_direct(self):
"""Receive from direct server"""
with bind0() as sock:
- addr = "127.0.0.1:%s/examples" % sock.port()
- d = self.proc(["direct", "-a", addr])
- d.wait_re("listenin")
- self.assertEqual(receive_expect(100), self.proc(["receive", "-a", addr]).wait_out())
- self.assertIn("100 messages sent and acknowledged\n", d.wait_out())
+ addr = "127.0.0.1:%s" % sock.port()
+ d = self.proc(["direct", addr])
+ d.wait_re("listening")
+ self.assertEqual(receive_expect(10), self.proc(["receive", addr]).wait_out())
+ self.assertIn("10 messages sent and acknowledged\n", d.wait_out())
if __name__ == "__main__":
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 94c4891..9a8a0ad 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -66,15 +66,17 @@ PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor);
*
* @param[in] proactor the proactor object
* @param[in] connection the proactor takes ownership, do not free
- * @param[in] host address to connect on
+ * @param[in] addr the network address (not AMQP address) to connect to. May
+ * be in the form "host:port" or an "amqp://" or "amqps://" URL. The `/path` part of
+ * the URL is ignored.
* @param[in] port port to connect to
*
* @return error on immediate error, e.g. an allocation failure.
* Other errors are indicated by connection or transport events via
PNP_EXTERN * pn_proactor_wait()
*/
-PNP_EXTERN int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection,
- const char *host, const char *port);
+PNP_EXTERN int pn_proactor_connect(
+ pn_proactor_t *proactor, pn_connection_t *connection, const char *addr);
/**
* Start listening with listener. pn_proactor_wait() will return a
@@ -82,16 +84,17 @@ PNP_EXTERN int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *con
*
* @param[in] proactor the proactor object
* @param[in] listener proactor takes ownership of listener, do not free
- * @param[in] host address to listen on
- * @param[in] port port to listen on
+ * @param[in] addr the network address (not AMQP address) to connect to. May
+ * be in the form "host:port" or an "amqp://" or "amqps://" URL. The `/path` part of
+ * the URL is ignored.
* @param[in] backlog number of connection requests to queue
*
* @return error on immediate error, e.g. an allocation failure.
* Other errors are indicated by pn_listener_condition() on the
* PN_LISTENER_CLOSE event.
*/
-PNP_EXTERN int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener,
- const char *host, const char *port, int backlog);
+PNP_EXTERN int pn_proactor_listen(
+ pn_proactor_t *proactor, pn_listener_t *listener, const char *addr, int backlog);
/**
* Wait until there is at least one event to handle.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 2fafbb3..393df27 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -366,8 +366,6 @@ static void psocket_error(psocket_t *ps, int err, const char* what) {
}
}
-/* FIXME aconway 2017-02-25: split socket/queue */
-
/* psocket uv-initialization */
static int leader_init(psocket_t *ps) {
ps->working = false;
@@ -418,7 +416,7 @@ static void on_connect(uv_connect_t *connect, int err) {
pconnection_t *pc = (pconnection_t*)connect->data;
assert(!pc->psocket.working);
if (err) pconnection_error(pc, err, "on connect to");
- pconnection_detach(pc); /* FIXME aconway 2017-02-25: detach AFTER error or vv */
+ pconnection_detach(pc);
}
/* Incoming connection ready to be accepted */
@@ -438,8 +436,6 @@ static void on_connection(uv_stream_t* server, int err) {
}
}
-// #error FIXME REVIW UPWARDS FROM HERE ^^^^
-
/* Common address resolution for leader_listen and leader_connect */
static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
int err = leader_init(ps);
@@ -532,7 +528,6 @@ static void on_tick(uv_timer_t *timer) {
assert(!pc->psocket.working);
leader_tick(pc);
pconnection_detach(pc);
- /* FIXME aconway 2017-02-25: optimize - don't detach if no work. Need to check for finished? */
}
static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
@@ -768,7 +763,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
uv_mutex_lock(&p->lock);
- psocket_t *ps = batch_psocket(batch); /* FIXME aconway 2017-02-26: replace with switch? */
+ psocket_t *ps = batch_psocket(batch);
if (ps) {
assert(ps->working);
assert(ps->next == &UNLISTED);
@@ -817,8 +812,13 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
notify(p);
}
-int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
- pconnection_t *pc = pconnection(p, c, false, host, port);
+int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
+ pn_url_t *url = pn_url_parse(addr);
+ if (!url) {
+ return PN_OUT_OF_MEMORY;
+ }
+ pconnection_t *pc = pconnection(p, c, false, pn_url_get_host(url), pn_url_get_port(url));
+ pn_url_free(url);
if (!pc) {
return PN_OUT_OF_MEMORY;
}
@@ -826,10 +826,14 @@ int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host,
return 0;
}
-int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
-{
+int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog) {
assert(!l->closed);
- psocket_init(&l->psocket, p, false, host, port);
+ pn_url_t *url = pn_url_parse(addr);
+ if (!url) {
+ return PN_OUT_OF_MEMORY;
+ }
+ psocket_init(&l->psocket, p, false, pn_url_get_host(url), pn_url_get_port(url));
+ pn_url_free(url);
l->backlog = backlog;
psocket_start(&l->psocket);
return 0;
@@ -861,7 +865,7 @@ void pn_proactor_free(pn_proactor_t *p) {
uv_mutex_destroy(&p->lock);
uv_cond_destroy(&p->cond);
pn_collector_free(p->collector);
- /* FIXME aconway 2017-02-25: restore */
+ /* FIXME aconway 2017-02-25: memory leaks, need to clean up the queues */
/* assert(!p->worker_q.front); */
/* assert(!p->leader_q.front); */
free(p);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index a0ddcda..84524b5 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -163,10 +163,10 @@ static void test_client_server(test_t *t) {
proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
- test_port_t port = test_port();
- pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
+ test_port_t port = test_port(localhost);
+ pn_proactor_listen(server, pn_listener(), port.host_port, 4);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
- pn_proactor_connect(client, pn_connection(), localhost, port.str);
+ pn_proactor_connect(client, pn_connection(), port.host_port);
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
sock_close(port.sock);
PROACTOR_TEST_FREE(pts);
@@ -190,11 +190,11 @@ static void test_connection_wake(test_t *t) {
proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
- test_port_t port = test_port(); /* Hold a port */
- pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
+ test_port_t port = test_port(localhost); /* Hold a port */
+ pn_proactor_listen(server, pn_listener(), port.host_port, 4);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
pn_connection_t *c = pn_connection();
- pn_proactor_connect(client, c, localhost, port.str);
+ pn_proactor_connect(client, c, port.host_port);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
pn_connection_wake(c);
@@ -209,13 +209,13 @@ static void test_inactive(test_t *t) {
proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
- test_port_t port = test_port(); /* Hold a port */
+ test_port_t port = test_port(localhost); /* Hold a port */
pn_listener_t *l = pn_listener();
- pn_proactor_listen(server, l, localhost, port.str, 4);
+ pn_proactor_listen(server, l, port.host_port, 4);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
pn_connection_t *c = pn_connection();
- pn_proactor_connect(client, c, localhost, port.str);
+ pn_proactor_connect(client, c, port.host_port);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
pn_connection_wake(c);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
@@ -246,17 +246,17 @@ static void test_errors(test_t *t) {
proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
- test_port_t port = test_port(); /* Hold a port */
+ test_port_t port = test_port(localhost); /* Hold a port */
/* Invalid connect/listen parameters */
pn_connection_t *c = pn_connection();
- pn_proactor_connect(client, c, localhost, "xxx");
+ pn_proactor_connect(client, c, "127.0.0.1:xxx");
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
TEST_CHECK_ERROR(t, "xxx", pn_transport_condition(pn_connection_transport(c)));
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
pn_listener_t *l = pn_listener();
- pn_proactor_listen(server, l, localhost, "xxx", 1);
+ pn_proactor_listen(server, l, "127.0.0.1:xxx", 1);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
TEST_CHECK_ERROR(t, "xxx", pn_listener_condition(l));
@@ -264,7 +264,7 @@ static void test_errors(test_t *t) {
/* Connect with no listener */
c = pn_connection();
- pn_proactor_connect(client, c, localhost, port.str);
+ pn_proactor_connect(client, c, port.host_port);
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))));
TEST_CHECK_ERROR(t, "connection refused", pn_transport_condition(pn_connection_transport(c)));
@@ -274,6 +274,8 @@ static void test_errors(test_t *t) {
PROACTOR_TEST_FREE(pts);
}
+/* Tests for use of URLs */
+
int main(int argc, char **argv) {
int failed = 0;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 97dac3f..9fe679c 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -202,16 +202,18 @@ static int sock_port(sock_t sock) {
/* Combines includes a sock_t with the int and char* versions of the port for convenience */
typedef struct test_port_t {
sock_t sock;
- int port;
- char str[256];
+ int port; /* port as integer */
+ char str[256]; /* port as string */
+ char host_port[256]; /* host:port string */
} test_port_t;
/* Create a test_port_t */
-static inline test_port_t test_port(void) {
+static inline test_port_t test_port(const char* host) {
test_port_t tp = {0};
tp.sock = sock_bind0();
tp.port = sock_port(tp.sock);
snprintf(tp.str, sizeof(tp.str), "%d", tp.port);
+ snprintf(tp.host_port, sizeof(tp.host_port), "%s:%d", host, tp.port);
return tp;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org