You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/01/10 15:59:09 UTC
[48/55] [partial] qpid-proton-j git commit: PROTON-1385: retain
proton-j content only, the rest remains in the other repo at:
https://git-wip-us.apache.org/repos/asf/qpid-proton.git
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
deleted file mode 100644
index f701651..0000000
--- a/examples/c/proactor/CMakeLists.txt
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED)
-
-include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
-
-add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
-
-find_package(Libuv)
-if (Libuv_FOUND)
- foreach(name broker send receive)
- add_executable(libuv_${name} ${name}.c libuv_proactor.c)
- target_link_libraries(libuv_${name} ${Proton_LIBRARIES} ${Libuv_LIBRARIES})
- set_target_properties(libuv_${name} PROPERTIES
- COMPILE_DEFINITIONS "PN_PROACTOR_INCLUDE=\"libuv_proactor.h\"")
- endforeach()
-
- # Add a test with the correct environment to find test executables and valgrind.
- if(WIN32)
- set(test_path "$<TARGET_FILE_DIR:libuv_broker>;$<TARGET_FILE_DIR:qpid-proton>")
- else(WIN32)
- set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
- endif(WIN32)
- set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
- add_test(c-proactor-libuv ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py)
-endif()
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox
deleted file mode 100644
index 4b09cb7..0000000
--- a/examples/c/proactor/README.dox
+++ /dev/null
@@ -1,17 +0,0 @@
-/**
- * @example send.c
- *
- * Send a fixed number of messages to the "example" node.
- *
- * @example receive.c
- *
- * Subscribes to the 'example' node and prints the message bodies
- * received.
- *
- * @example broker.c
- *
- * A simple multithreaded broker that works with the send and receive
- * examples.
- *
- * __Requires C++11__
- */
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
deleted file mode 100644
index ca52336..0000000
--- a/examples/c/proactor/broker.c
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/connection_driver.h>
-#include <proton/proactor.h>
-#include <proton/engine.h>
-#include <proton/sasl.h>
-#include <proton/transport.h>
-#include <proton/url.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-/* TODO aconway 2016-10-14: this example does not require libuv IO,
- it uses uv.h only for portable mutex and thread functions.
-*/
-#include <uv.h>
-
-bool enable_debug = false;
-
-void debug(const char* fmt, ...) {
- if (enable_debug) {
- va_list(ap);
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- fputc('\n', stderr);
- fflush(stderr);
- }
-}
-
-void check(int err, const char* s) {
- if (err != 0) {
- perror(s);
- exit(1);
- }
-}
-
-void pcheck(int err, const char* s) {
- if (err != 0) {
- fprintf(stderr, "%s: %s", s, pn_code(err));
- exit(1);
- }
-}
-
-/* Simple re-sizable vector that acts as a queue */
-#define VEC(T) struct { T* data; size_t len, cap; }
-
-#define VEC_INIT(V) \
- do { \
- V.len = 0; \
- V.cap = 16; \
- void **vp = (void**)&V.data; \
- *vp = malloc(V.cap * sizeof(*V.data)); \
- } while(0)
-
-#define VEC_FINAL(V) free(V.data)
-
-#define VEC_PUSH(V, X) \
- do { \
- if (V.len == V.cap) { \
- V.cap *= 2; \
- void **vp = (void**)&V.data; \
- *vp = realloc(V.data, V.cap * sizeof(*V.data)); \
- } \
- V.data[V.len++] = X; \
- } while(0) \
-
-#define VEC_POP(V) \
- do { \
- if (V.len > 0) \
- memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data)); \
- } while(0)
-
-/* Simple thread-safe queue implementation */
-typedef struct queue_t {
- uv_mutex_t lock;
- char* name;
- VEC(pn_rwbytes_t) messages; /* Messages on the queue_t */
- VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
- struct queue_t *next; /* Next queue in chain */
- size_t sent; /* Count of messages sent, used as delivery tag */
-} queue_t;
-
-static void queue_init(queue_t *q, const char* name, queue_t *next) {
- debug("created queue %s", name);
- uv_mutex_init(&q->lock);
- q->name = strdup(name);
- VEC_INIT(q->messages);
- VEC_INIT(q->waiting);
- q->next = next;
- q->sent = 0;
-}
-
-static void queue_destroy(queue_t *q) {
- uv_mutex_destroy(&q->lock);
- free(q->name);
- for (size_t i = 0; i < q->messages.len; ++i)
- free(q->messages.data[i].start);
- VEC_FINAL(q->messages);
- for (size_t i = 0; i < q->waiting.len; ++i)
- pn_decref(q->waiting.data[i]);
- VEC_FINAL(q->waiting);
-}
-
-/* Send a message on s, or record s as eating if no messages.
- Called in s dispatch loop, assumes s has credit.
-*/
-static void queue_send(queue_t *q, pn_link_t *s) {
- pn_rwbytes_t m = { 0 };
- size_t tag = 0;
- uv_mutex_lock(&q->lock);
- if (q->messages.len == 0) { /* Empty, record connection as waiting */
- debug("queue is empty %s", q->name);
- /* Record connection for wake-up if not already on the list. */
- pn_connection_t *c = pn_session_connection(pn_link_session(s));
- size_t i = 0;
- for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
- ;
- if (i == q->waiting.len) {
- VEC_PUSH(q->waiting, c);
- }
- } else {
- debug("sending from queue %s", q->name);
- m = q->messages.data[0];
- VEC_POP(q->messages);
- tag = ++q->sent;
- }
- uv_mutex_unlock(&q->lock);
- if (m.start) {
- pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
- pn_link_send(s, m.start, m.size);
- pn_link_advance(s);
- pn_delivery_settle(d); /* Pre-settled: unreliable, there will bea no ack/ */
- free(m.start);
- }
-}
-
-/* Data associated with each broker connection */
-typedef struct broker_data_t {
- bool check_queues; /* Check senders on the connection for available data in queues. */
-} broker_data_t;
-
-/* Use the context pointer as a boolean flag to indicate we need to check queues */
-void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
- pn_connection_set_context(c, (void*)check);
-}
-
-bool pn_connection_get_check_queues(pn_connection_t *c) {
- return (bool)pn_connection_get_context(c);
-}
-
-/* Put a message on the queue, called in receiver dispatch loop.
- If the queue was previously empty, notify waiting senders.
-*/
-static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
- debug("received to queue %s", q->name);
- uv_mutex_lock(&q->lock);
- VEC_PUSH(q->messages, m);
- if (q->messages.len == 1) { /* Was empty, notify waiting connections */
- for (size_t i = 0; i < q->waiting.len; ++i) {
- pn_connection_t *c = q->waiting.data[i];
- pn_connection_set_check_queues(c, true);
- pn_connection_wake(c); /* Wake the connection */
- }
- q->waiting.len = 0;
- }
- uv_mutex_unlock(&q->lock);
-}
-
-/* Thread safe set of queues */
-typedef struct queues_t {
- uv_mutex_t lock;
- queue_t *queues;
- size_t sent;
-} queues_t;
-
-void queues_init(queues_t *qs) {
- uv_mutex_init(&qs->lock);
- qs->queues = NULL;
-}
-
-void queues_destroy(queues_t *qs) {
- for (queue_t *q = qs->queues; q; q = q->next) {
- queue_destroy(q);
- free(q);
- }
- uv_mutex_destroy(&qs->lock);
-}
-
-/** Get or create the named queue. */
-queue_t* queues_get(queues_t *qs, const char* name) {
- uv_mutex_lock(&qs->lock);
- queue_t *q;
- for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
- ;
- if (!q) {
- q = (queue_t*)malloc(sizeof(queue_t));
- queue_init(q, name, qs->queues);
- qs->queues = q;
- }
- uv_mutex_unlock(&qs->lock);
- return q;
-}
-
-/* The broker implementation */
-typedef struct broker_t {
- pn_proactor_t *proactor;
- queues_t queues;
- const char *container_id; /* AMQP container-id */
- size_t threads;
- pn_millis_t heartbeat;
- bool finished;
-} broker_t;
-
-void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
- memset(b, 0, sizeof(*b));
- b->proactor = pn_proactor();
- queues_init(&b->queues);
- b->container_id = container_id;
- b->threads = threads;
- b->heartbeat = 0;
-}
-
-void broker_stop(broker_t *b) {
- /* In this broker an interrupt stops a thread, stopping all threads stops the broker */
- for (size_t i = 0; i < b->threads; ++i)
- pn_proactor_interrupt(b->proactor);
-}
-
-/* Try to send if link is sender and has credit */
-static void link_send(broker_t *b, pn_link_t *s) {
- if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
- const char *qname = pn_terminus_get_address(pn_link_source(s));
- queue_t *q = queues_get(&b->queues, qname);
- queue_send(q, s);
- }
-}
-
-static void queue_unsub(queue_t *q, pn_connection_t *c) {
- uv_mutex_lock(&q->lock);
- for (size_t i = 0; i < q->waiting.len; ++i) {
- if (q->waiting.data[i] == c){
- q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
- VEC_POP(q->waiting);
- break;
- }
- }
- uv_mutex_unlock(&q->lock);
-}
-
-/* Unsubscribe from the queue of interest to this link. */
-static void link_unsub(broker_t *b, pn_link_t *s) {
- if (pn_link_is_sender(s)) {
- const char *qname = pn_terminus_get_address(pn_link_source(s));
- if (qname) {
- queue_t *q = queues_get(&b->queues, qname);
- queue_unsub(q, pn_session_connection(pn_link_session(s)));
- }
- }
-}
-
-/* Called in connection's event loop when a connection is woken for messages.*/
-static void connection_unsub(broker_t *b, pn_connection_t *c) {
- for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
- link_unsub(b, l);
-}
-
-static void session_unsub(broker_t *b, pn_session_t *ssn) {
- pn_connection_t *c = pn_session_connection(ssn);
- for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
- if (pn_link_session(l) == ssn)
- link_unsub(b, l);
- }
-}
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
- if (pn_condition_is_set(cond)) {
- const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN";
- fprintf(stderr, "%s: %s: %s\n", ename,
- pn_condition_get_name(cond), pn_condition_get_description(cond));
- }
-}
-
-const int WINDOW=10; /* Incoming credit window */
-
-static void handle(broker_t* b, pn_event_t* e) {
- pn_connection_t *c = pn_event_connection(e);
-
- switch (pn_event_type(e)) {
-
- case PN_LISTENER_ACCEPT:
- pn_listener_accept(pn_event_listener(e), pn_connection());
- break;
-
- case PN_CONNECTION_INIT:
- pn_connection_set_container(c, b->container_id);
- break;
-
- case PN_CONNECTION_BOUND: {
- /* Turn off security */
- pn_transport_t *t = pn_connection_transport(c);
- pn_transport_require_auth(t, false);
- pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
- pn_transport_set_idle_timeout(t, 2 * b->heartbeat);
- }
- case PN_CONNECTION_REMOTE_OPEN: {
- pn_connection_open(pn_event_connection(e)); /* Complete the open */
- break;
- }
- case PN_CONNECTION_WAKE: {
- if (pn_connection_get_check_queues(c)) {
- pn_connection_set_check_queues(c, false);
- int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
- for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
- link_send(b, l);
- }
- break;
- }
- case PN_SESSION_REMOTE_OPEN: {
- pn_session_open(pn_event_session(e));
- break;
- }
- case PN_LINK_REMOTE_OPEN: {
- pn_link_t *l = pn_event_link(e);
- if (pn_link_is_sender(l)) {
- const char *source = pn_terminus_get_address(pn_link_remote_source(l));
- pn_terminus_set_address(pn_link_source(l), source);
- } else {
- const char* target = pn_terminus_get_address(pn_link_remote_target(l));
- pn_terminus_set_address(pn_link_target(l), target);
- pn_link_flow(l, WINDOW);
- }
- pn_link_open(l);
- break;
- }
- case PN_LINK_FLOW: {
- link_send(b, pn_event_link(e));
- break;
- }
- case PN_DELIVERY: {
- pn_delivery_t *d = pn_event_delivery(e);
- pn_link_t *r = pn_delivery_link(d);
- if (pn_link_is_receiver(r) &&
- pn_delivery_readable(d) && !pn_delivery_partial(d))
- {
- size_t size = pn_delivery_pending(d);
- /* The broker does not decode the message, just forwards it. */
- pn_rwbytes_t m = { size, (char*)malloc(size) };
- pn_link_recv(r, m.start, m.size);
- const char *qname = pn_terminus_get_address(pn_link_target(r));
- queue_receive(b->proactor, queues_get(&b->queues, qname), m);
- pn_delivery_update(d, PN_ACCEPTED);
- pn_delivery_settle(d);
- pn_link_flow(r, WINDOW - pn_link_credit(r));
- }
- break;
- }
-
- case PN_TRANSPORT_CLOSED:
- connection_unsub(b, pn_event_connection(e));
- check_condition(e, pn_transport_condition(pn_event_transport(e)));
- break;
-
- case PN_CONNECTION_REMOTE_CLOSE:
- check_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
- connection_unsub(b, pn_event_connection(e));
- pn_connection_close(pn_event_connection(e));
- break;
-
- case PN_SESSION_REMOTE_CLOSE:
- check_condition(e, pn_session_remote_condition(pn_event_session(e)));
- session_unsub(b, pn_event_session(e));
- pn_session_close(pn_event_session(e));
- pn_session_free(pn_event_session(e));
- break;
-
- case PN_LINK_REMOTE_CLOSE:
- check_condition(e, pn_link_remote_condition(pn_event_link(e)));
- link_unsub(b, pn_event_link(e));
- pn_link_close(pn_event_link(e));
- pn_link_free(pn_event_link(e));
- break;
-
- case PN_LISTENER_CLOSE:
- check_condition(e, pn_listener_condition(pn_event_listener(e)));
- break;
-
- case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
- broker_stop(b);
- break;
-
- case PN_PROACTOR_INTERRUPT:
- b->finished = true;
- break;
-
- default:
- break;
- }
-}
-
-static void broker_thread(void *void_broker) {
- broker_t *b = (broker_t*)void_broker;
- do {
- pn_event_batch_t *events = pn_proactor_wait(b->proactor);
- pn_event_t *e;
- while ((e = pn_event_batch_next(events))) {
- handle(b, e);
- }
- pn_proactor_done(b->proactor, events);
- } while(!b->finished);
-}
-
-static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-d] [-a url] [-t thread-count]\n", arg0);
- exit(1);
-}
-
-int main(int argc, char **argv) {
- /* Command line options */
- char *urlstr = NULL;
- char container_id[256];
- /* Default container-id is program:pid */
- snprintf(container_id, sizeof(container_id), "%s:%d", argv[0], getpid());
- size_t nthreads = 4;
- pn_millis_t heartbeat = 0;
- int opt;
- while ((opt = getopt(argc, argv, "a:t:dh:c:")) != -1) {
- switch (opt) {
- case 'a': urlstr = optarg; break;
- case 't': nthreads = atoi(optarg); break;
- case 'd': enable_debug = true; break;
- case 'h': heartbeat = atoi(optarg); break;
- case 'c': strncpy(container_id, optarg, sizeof(container_id)); break;
- default: usage(argv[0]); break;
- }
- }
- if (optind < argc)
- usage(argv[0]);
-
- broker_t b;
- broker_init(&b, container_id, nthreads, heartbeat);
-
- /* Parse the URL or use default values */
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- /* Listen on IPv6 wildcard. On systems that do not set IPV6ONLY by default,
- this will also listen for mapped IPv4 on the same port.
- */
- const char *host = url ? pn_url_get_host(url) : "::";
- const char *port = url ? pn_url_get_port(url) : "amqp";
- pn_proactor_listen(b.proactor, pn_listener(), host, port, 16);
- printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
-
- if (url) pn_url_free(url);
- if (b.threads <= 0) {
- fprintf(stderr, "invalid value -t %zu, threads must be > 0\n", b.threads);
- exit(1);
- }
- /* Start n-1 threads and use main thread */
- uv_thread_t* threads = (uv_thread_t*)calloc(sizeof(uv_thread_t), b.threads);
- for (size_t i = 0; i < b.threads-1; ++i) {
- check(uv_thread_create(&threads[i], broker_thread, &b), "pthread_create");
- }
- broker_thread(&b); /* Use the main thread too. */
- for (size_t i = 0; i < b.threads-1; ++i) {
- check(uv_thread_join(&threads[i]), "pthread_join");
- }
- pn_proactor_free(b.proactor);
- free(threads);
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
deleted file mode 100644
index 42bbfab..0000000
--- a/examples/c/proactor/libuv_proactor.c
+++ /dev/null
@@ -1,873 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <uv.h>
-
-#include <proton/condition.h>
-#include <proton/connection_driver.h>
-#include <proton/engine.h>
-#include <proton/message.h>
-#include <proton/object.h>
-#include <proton/proactor.h>
-#include <proton/transport.h>
-#include <proton/url.h>
-
-#include <assert.h>
-#include <stddef.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-/*
- libuv loop functions are thread unsafe. The only exception is uv_async_send()
- which is a thread safe "wakeup" that can wake the uv_loop from another thread.
-
- To provide concurrency the proactor uses a "leader-worker-follower" model,
- threads take turns at the roles:
-
- - a single "leader" calls libuv functions and runs the uv_loop in short bursts
- to generate work. When there is work available it gives up leadership and
- becomes a "worker"
-
- - "workers" handle events concurrently for distinct connections/listeners
- They do as much work as they can get, when none is left they become "followers"
-
- - "followers" wait for the leader to generate work and become workers.
- When the leader itself becomes a worker, one of the followers takes over.
-
- This model is symmetric: any thread can take on any role based on run-time
- requirements. It also allows the IO and non-IO work associated with an IO
- wake-up to be processed in a single thread with no context switches.
-
- Function naming:
- - on_ - called in leader thread via uv_run().
- - leader_ - called in leader thread, while processing the leader_q.
- - owner_ - called in owning thread, leader or worker but not concurrently.
-
- Note on_ and leader_ functions can call each other, the prefix indicates the
- path they are most often called on.
-*/
-
-const char *COND_NAME = "proactor";
-const char *AMQP_PORT = "5672";
-const char *AMQP_PORT_NAME = "amqp";
-const char *AMQPS_PORT = "5671";
-const char *AMQPS_PORT_NAME = "amqps";
-
-PN_HANDLE(PN_PROACTOR)
-
-/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
- Class definitions are for identification as pn_event_t context only.
-*/
-PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
-PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
-
-/* common to connection and listener */
-typedef struct psocket_t {
- /* Immutable */
- pn_proactor_t *proactor;
-
- /* Protected by proactor.lock */
- struct psocket_t* next;
- void (*wakeup)(struct psocket_t*); /* interrupting action for leader */
-
- /* Only used by leader */
- uv_tcp_t tcp;
- void (*action)(struct psocket_t*); /* deferred action for leader */
- bool is_conn:1;
- char host[NI_MAXHOST];
- char port[NI_MAXSERV];
-} psocket_t;
-
-/* Special value for psocket.next pointer when socket is not on any any list. */
-psocket_t UNLISTED;
-
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) {
- ps->proactor = p;
- ps->next = &UNLISTED;
- ps->is_conn = is_conn;
- ps->tcp.data = ps;
-
- /* For platforms that don't know about "amqp" and "amqps" service names. */
- if (strcmp(port, AMQP_PORT_NAME) == 0)
- port = AMQP_PORT;
- else if (strcmp(port, AMQPS_PORT_NAME) == 0)
- port = AMQPS_PORT;
- /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
- strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
- strncpy(ps->port, port ? port : "\001", sizeof(ps->port));
-}
-
-/* Turn "\001" back to NULL */
-static inline const char* fixstr(const char* str) {
- return str[0] == '\001' ? NULL : str;
-}
-
-typedef struct pconnection_t {
- psocket_t psocket;
-
- /* Only used by owner thread */
- pn_connection_driver_t driver;
-
- /* Only used by leader */
- uv_connect_t connect;
- uv_timer_t timer;
- uv_write_t write;
- uv_shutdown_t shutdown;
- size_t writing;
- bool reading:1;
- bool server:1; /* accept, not connect */
-} pconnection_t;
-
-struct pn_listener_t {
- psocket_t psocket;
-
- /* Only used by owner thread */
- pconnection_t *accepting; /* accept in progress */
- pn_condition_t *condition;
- pn_collector_t *collector;
- pn_event_batch_t batch;
- pn_record_t *attachments;
- void *context;
- size_t backlog;
-};
-
-
-typedef struct queue { psocket_t *front, *back; } queue;
-
-struct pn_proactor_t {
- /* Leader thread */
- uv_cond_t cond;
- uv_loop_t loop;
- uv_async_t async;
- uv_timer_t timer;
-
- /* Owner thread: proactor collector and batch can belong to leader or a worker */
- pn_collector_t *collector;
- pn_event_batch_t batch;
-
- /* Protected by lock */
- uv_mutex_t lock;
- queue start_q;
- queue worker_q;
- queue leader_q;
- size_t interrupt; /* pending interrupts */
- pn_millis_t timeout;
- size_t count; /* psocket count */
- bool inactive:1;
- bool timeout_request:1;
- bool timeout_elapsed:1;
- bool has_leader:1;
- bool batch_working:1; /* batch belongs to a worker. */
-};
-
-static bool push_lh(queue *q, psocket_t *ps) {
- if (ps->next != &UNLISTED) /* Don't move if already listed. */
- return false;
- ps->next = NULL;
- if (!q->front) {
- q->front = q->back = ps;
- } else {
- q->back->next = ps;
- q->back = ps;
- }
- return true;
-}
-
-static psocket_t* pop_lh(queue *q) {
- psocket_t *ps = q->front;
- if (ps) {
- q->front = ps->next;
- ps->next = &UNLISTED;
- }
- return ps;
-}
-
-static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
- return ps->is_conn ? (pconnection_t*)ps : NULL;
-}
-
-static inline pn_listener_t *as_listener(psocket_t* ps) {
- return ps->is_conn ? NULL: (pn_listener_t*)ps;
-}
-
-/* Put ps on the leader queue for processing. Thread safe. */
-static void to_leader_lh(psocket_t *ps) {
- push_lh(&ps->proactor->leader_q, ps);
- uv_async_send(&ps->proactor->async); /* Wake leader */
-}
-
-static void to_leader(psocket_t *ps) {
- uv_mutex_lock(&ps->proactor->lock);
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Detach from IO and put ps on the worker queue */
-static void leader_to_worker(psocket_t *ps) {
- if (ps->is_conn) {
- pconnection_t *pc = as_pconnection_t(ps);
- /* Don't detach if there are no events yet. */
- if (pn_connection_driver_has_event(&pc->driver)) {
- if (pc->writing) {
- pc->writing = 0;
- uv_cancel((uv_req_t*)&pc->write);
- }
- if (pc->reading) {
- pc->reading = false;
- uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
- }
- if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
- uv_timer_stop(&pc->timer);
- }
- }
- } else {
- pn_listener_t *l = as_listener(ps);
- uv_read_stop((uv_stream_t*)&l->psocket.tcp);
- }
- uv_mutex_lock(&ps->proactor->lock);
- push_lh(&ps->proactor->worker_q, ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Set a deferred action for leader, if not already set. */
-static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- if (!ps->action) {
- ps->action = action;
- }
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Owner thread send to worker thread. Set deferred action if not already set. */
-static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- if (!ps->action) {
- ps->action = action;
- }
- push_lh(&ps->proactor->worker_q, ps);
- uv_async_send(&ps->proactor->async); /* Wake leader */
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-
-/* Re-queue for further work */
-static void worker_requeue(psocket_t* ps) {
- uv_mutex_lock(&ps->proactor->lock);
- push_lh(&ps->proactor->worker_q, ps);
- uv_async_send(&ps->proactor->async); /* Wake leader */
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
- pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
- if (!pc) return NULL;
- if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
- return NULL;
- }
- psocket_init(&pc->psocket, p, true, host, port);
- if (server) {
- pn_transport_set_server(pc->driver.transport);
- }
- pn_record_t *r = pn_connection_attachments(pc->driver.connection);
- pn_record_def(r, PN_PROACTOR, PN_VOID);
- pn_record_set(r, PN_PROACTOR, pc);
- return pc;
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
-
-static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
- return (batch->next_event == proactor_batch_next) ?
- (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
-}
-
-static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
- return (batch->next_event == listener_batch_next) ?
- (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
-}
-
-static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
- pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
- return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
-}
-
-static void leader_count(pn_proactor_t *p, int change) {
- uv_mutex_lock(&p->lock);
- p->count += change;
- p->inactive = (p->count == 0);
- uv_mutex_unlock(&p->lock);
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
- if (pn_connection_driver_has_event(&pc->driver)) {
- leader_to_worker(&pc->psocket); /* Return to worker */
- } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
- /* All UV requests are finished */
- pn_connection_driver_destroy(&pc->driver);
- leader_count(pc->psocket.proactor, -1);
- free(pc);
- }
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_listener_maybe_free(pn_listener_t *l) {
- if (pn_collector_peek(l->collector)) {
- leader_to_worker(&l->psocket); /* Return to worker */
- } else if (!l->psocket.tcp.data) {
- pn_condition_free(l->condition);
- leader_count(l->psocket.proactor, -1);
- free(l);
- }
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_maybe_free(psocket_t *ps) {
- if (ps->is_conn) {
- leader_pconnection_t_maybe_free(as_pconnection_t(ps));
- } else {
- leader_listener_maybe_free(as_listener(ps));
- }
-}
-
-static void on_close(uv_handle_t *h) {
- psocket_t *ps = (psocket_t*)h->data;
- h->data = NULL; /* Mark closed */
- leader_maybe_free(ps);
-}
-
-static void on_shutdown(uv_shutdown_t *shutdown, int err) {
- psocket_t *ps = (psocket_t*)shutdown->data;
- shutdown->data = NULL; /* Mark closed */
- leader_maybe_free(ps);
-}
-
-static inline void leader_close(psocket_t *ps) {
- if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
- uv_close((uv_handle_t*)&ps->tcp, on_close);
- }
- pconnection_t *pc = as_pconnection_t(ps);
- if (pc) {
- pn_connection_driver_close(&pc->driver);
- if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
- uv_timer_stop(&pc->timer);
- uv_close((uv_handle_t*)&pc->timer, on_close);
- }
- }
- leader_maybe_free(ps);
-}
-
-static pconnection_t *get_pconnection_t(pn_connection_t* c) {
- if (!c) return NULL;
- pn_record_t *r = pn_connection_attachments(c);
- return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
-}
-
-static void leader_error(psocket_t *ps, int err, const char* what) {
- if (ps->is_conn) {
- pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
- pn_connection_driver_bind(driver); /* Bind so errors will be reported */
- pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
- what, fixstr(ps->host), fixstr(ps->port),
- uv_strerror(err));
- pn_connection_driver_close(driver);
- } else {
- pn_listener_t *l = as_listener(ps);
- pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
- what, fixstr(ps->host), fixstr(ps->port),
- uv_strerror(err));
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
- }
- leader_to_worker(ps); /* Worker to handle the error */
-}
-
-/* uv-initialization */
-static int leader_init(psocket_t *ps) {
- leader_count(ps->proactor, +1);
- int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
- if (!err) {
- pconnection_t *pc = as_pconnection_t(ps);
- if (pc) {
- pc->connect.data = ps;
- int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
- if (!err) {
- pc->timer.data = pc;
- }
- }
- }
- if (err) {
- leader_error(ps, err, "initialization");
- }
- return err;
-}
-
-/* Common logic for on_connect and on_accept */
-static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
- if (!err) {
- leader_to_worker(&pc->psocket);
- } else {
- leader_error(&pc->psocket, err, what);
- }
-}
-
-static void on_connect(uv_connect_t *connect, int err) {
- leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
-}
-
-static void on_accept(uv_stream_t* server, int err) {
- pn_listener_t *l = (pn_listener_t*) server->data;
- if (err) {
- leader_error(&l->psocket, err, "on accept");
- }
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
- leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */
-}
-
-static void leader_accept(psocket_t *ps) {
- pn_listener_t * l = as_listener(ps);
- pconnection_t *pc = l->accepting;
- l->accepting = NULL;
- if (pc) {
- int err = leader_init(&pc->psocket);
- if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
- leader_connect_accept(pc, err, "on accept");
- }
-}
-
-static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
- int err = leader_init(ps);
- struct addrinfo hints = { 0 };
- if (server) hints.ai_flags = AI_PASSIVE;
- if (!err) {
- err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints);
- }
- return err;
-}
-
-static void leader_connect(psocket_t *ps) {
- pconnection_t *pc = as_pconnection_t(ps);
- uv_getaddrinfo_t info;
- int err = leader_resolve(ps, &info, false);
- if (!err) {
- err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
- uv_freeaddrinfo(info.addrinfo);
- }
- if (err) {
- leader_error(ps, err, "connect to");
- }
-}
-
-static void leader_listen(psocket_t *ps) {
- pn_listener_t *l = as_listener(ps);
- uv_getaddrinfo_t info;
- int err = leader_resolve(ps, &info, true);
- if (!err) {
- err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
- uv_freeaddrinfo(info.addrinfo);
- }
- if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
- if (err) {
- leader_error(ps, err, "listen on ");
- }
-}
-
-static void on_tick(uv_timer_t *timer) {
- pconnection_t *pc = (pconnection_t*)timer->data;
- pn_transport_t *t = pc->driver.transport;
- if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
- uv_timer_stop(&pc->timer);
- uint64_t now = uv_now(pc->timer.loop);
- uint64_t next = pn_transport_tick(t, now);
- if (next) {
- uv_timer_start(&pc->timer, on_tick, next - now, 0);
- }
- }
-}
-
-static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
- pconnection_t *pc = (pconnection_t*)stream->data;
- if (nread >= 0) {
- pn_connection_driver_read_done(&pc->driver, nread);
- on_tick(&pc->timer); /* check for tick changes. */
- leader_to_worker(&pc->psocket);
- /* Reading continues automatically until stopped. */
- } else if (nread == UV_EOF) { /* hangup */
- pn_connection_driver_read_close(&pc->driver);
- leader_maybe_free(&pc->psocket);
- } else {
- leader_error(&pc->psocket, nread, "on read from");
- }
-}
-
-static void on_write(uv_write_t* write, int err) {
- pconnection_t *pc = (pconnection_t*)write->data;
- write->data = NULL;
- if (err == 0) {
- pn_connection_driver_write_done(&pc->driver, pc->writing);
- leader_to_worker(&pc->psocket);
- } else if (err == UV_ECANCELED) {
- leader_maybe_free(&pc->psocket);
- } else {
- leader_error(&pc->psocket, err, "on write to");
- }
- pc->writing = 0; /* Need to send a new write request */
-}
-
-static void on_timeout(uv_timer_t *timer) {
- pn_proactor_t *p = (pn_proactor_t*)timer->data;
- uv_mutex_lock(&p->lock);
- p->timeout_elapsed = true;
- uv_mutex_unlock(&p->lock);
-}
-
-// Read buffer allocation function for uv, just returns the transports read buffer.
-static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
- pconnection_t *pc = (pconnection_t*)stream->data;
- pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
- *buf = uv_buf_init(rbuf.start, rbuf.size);
-}
-
-static void leader_rewatch(psocket_t *ps) {
- int err = 0;
- if (ps->is_conn) {
- pconnection_t *pc = as_pconnection_t(ps);
- if (pc->timer.data) { /* uv-initialized */
- on_tick(&pc->timer); /* Re-enable ticks if required */
- }
- pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
- pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-
- /* Ticks and checking buffers can generate events, process before proceeding */
- if (pn_connection_driver_has_event(&pc->driver)) {
- leader_to_worker(ps);
- } else { /* Re-watch for IO */
- if (wbuf.size > 0 && !pc->writing) {
- pc->writing = wbuf.size;
- uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
- pc->write.data = ps;
- uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
- } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
- pc->shutdown.data = ps;
- uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
- }
- if (rbuf.size > 0 && !pc->reading) {
- pc->reading = true;
- err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
- }
- }
- } else {
- pn_listener_t *l = as_listener(ps);
- err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
- }
- if (err) {
- leader_error(ps, err, "rewatch");
- }
-}
-
-/* Set the event in the proactor's batch */
-static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
- pn_collector_put(p->collector, pn_proactor__class(), p, t);
- p->batch_working = true;
- return &p->batch;
-}
-
-/* Return the next event batch or 0 if no events are ready */
-static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
- if (!p->batch_working) { /* Can generate proactor events */
- if (p->inactive) {
- p->inactive = false;
- return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
- }
- if (p->interrupt > 0) {
- --p->interrupt;
- return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
- }
- if (p->timeout_elapsed) {
- p->timeout_elapsed = false;
- return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
- }
- }
- for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
- if (ps->is_conn) {
- pconnection_t *pc = as_pconnection_t(ps);
- return &pc->driver.batch;
- } else { /* Listener */
- pn_listener_t *l = as_listener(ps);
- return &l->batch;
- }
- to_leader(ps); /* No event, back to leader */
- }
- return 0;
-}
-
-/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */
-static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- ps->wakeup = action;
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-pn_listener_t *pn_event_listener(pn_event_t *e) {
- return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
-}
-
-pn_proactor_t *pn_event_proactor(pn_event_t *e) {
- if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
- pn_listener_t *l = pn_event_listener(e);
- if (l) return l->psocket.proactor;
- pn_connection_t *c = pn_event_connection(e);
- if (c) return pn_connection_proactor(pn_event_connection(e));
- return NULL;
-}
-
-void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
- pconnection_t *pc = batch_pconnection(batch);
- if (pc) {
- if (pn_connection_driver_has_event(&pc->driver)) {
- /* Process all events before going back to IO. */
- worker_requeue(&pc->psocket);
- } else if (pn_connection_driver_finished(&pc->driver)) {
- owner_to_leader(&pc->psocket, leader_close);
- } else {
- owner_to_leader(&pc->psocket, leader_rewatch);
- }
- return;
- }
- pn_listener_t *l = batch_listener(batch);
- if (l) {
- owner_to_leader(&l->psocket, leader_rewatch);
- return;
- }
- pn_proactor_t *bp = batch_proactor(batch);
- if (bp == p) {
- uv_mutex_lock(&p->lock);
- p->batch_working = false;
- uv_async_send(&p->async); /* Wake leader */
- uv_mutex_unlock(&p->lock);
- return;
- }
-}
-
-/* Run follower/leader loop till we can return an event and be a worker */
-pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
- uv_mutex_lock(&p->lock);
- /* Try to grab work immediately. */
- pn_event_batch_t *batch = get_batch_lh(p);
- if (batch == NULL) {
- /* No work available, follow the leader */
- while (p->has_leader) {
- uv_cond_wait(&p->cond, &p->lock);
- }
- /* Lead till there is work to do. */
- p->has_leader = true;
- while (batch == NULL) {
- if (p->timeout_request) {
- p->timeout_request = false;
- if (p->timeout) {
- uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
- } else {
- uv_timer_stop(&p->timer);
- }
- }
- for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
- void (*action)(psocket_t*) = ps->action;
- void (*wakeup)(psocket_t*) = ps->wakeup;
- ps->action = NULL;
- ps->wakeup = NULL;
- if (action || wakeup) {
- uv_mutex_unlock(&p->lock);
- if (action) action(ps);
- if (wakeup) wakeup(ps);
- uv_mutex_lock(&p->lock);
- }
- }
- batch = get_batch_lh(p);
- if (batch == NULL) {
- uv_mutex_unlock(&p->lock);
- uv_run(&p->loop, UV_RUN_ONCE);
- uv_mutex_lock(&p->lock);
- }
- }
- /* Signal the next leader and return to work */
- p->has_leader = false;
- uv_cond_signal(&p->cond);
- }
- uv_mutex_unlock(&p->lock);
- return batch;
-}
-
-void pn_proactor_interrupt(pn_proactor_t *p) {
- uv_mutex_lock(&p->lock);
- ++p->interrupt;
- uv_async_send(&p->async); /* Interrupt the UV loop */
- uv_mutex_unlock(&p->lock);
-}
-
-void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
- uv_mutex_lock(&p->lock);
- p->timeout = t;
- p->timeout_request = true;
- uv_async_send(&p->async); /* Interrupt the UV loop */
- uv_mutex_unlock(&p->lock);
-}
-
-int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
- pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
- if (!pc) {
- return PN_OUT_OF_MEMORY;
- }
- /* Process PN_CONNECTION_INIT before binding */
- owner_to_worker(&pc->psocket, leader_connect);
- return 0;
-}
-
-int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
-{
- psocket_init(&l->psocket, p, false, host, port);
- l->backlog = backlog;
- owner_to_leader(&l->psocket, leader_listen);
- return 0;
-}
-
-pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
- pconnection_t *pc = get_pconnection_t(c);
- return pc ? pc->psocket.proactor : NULL;
-}
-
-void leader_wake_connection(psocket_t *ps) {
- pconnection_t *pc = as_pconnection_t(ps);
- pn_connection_t *c = pc->driver.connection;
- pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
- leader_to_worker(ps);
-}
-
-void pn_connection_wake(pn_connection_t* c) {
- wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
-}
-
-pn_proactor_t *pn_proactor() {
- pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
- p->collector = pn_collector();
- p->batch.next_event = &proactor_batch_next;
- if (!p->collector) return NULL;
- uv_loop_init(&p->loop);
- uv_mutex_init(&p->lock);
- uv_cond_init(&p->cond);
- uv_async_init(&p->loop, &p->async, NULL);
- uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
- p->timer.data = p;
- return p;
-}
-
-static void on_stopping(uv_handle_t* h, void* v) {
- uv_close(h, NULL); /* Close this handle */
- if (!uv_loop_alive(h->loop)) /* Everything closed */
- uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */
-}
-
-void pn_proactor_free(pn_proactor_t *p) {
- uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */
- uv_run(&p->loop, UV_RUN_DEFAULT); /* Run till stop, all handles closed */
- uv_loop_close(&p->loop);
- uv_mutex_destroy(&p->lock);
- uv_cond_destroy(&p->cond);
- pn_collector_free(p->collector);
- free(p);
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
- pn_listener_t *l = batch_listener(batch);
- pn_event_t *handled = pn_collector_prev(l->collector);
- if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
- owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
- }
- return pn_collector_next(l->collector);
-}
-
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
- return pn_collector_next(batch_proactor(batch)->collector);
-}
-
-static void pn_listener_free(pn_listener_t *l) {
- if (l) {
- if (!l->collector) pn_collector_free(l->collector);
- if (!l->condition) pn_condition_free(l->condition);
- if (!l->attachments) pn_free(l->attachments);
- free(l);
- }
-}
-
-pn_listener_t *pn_listener() {
- pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
- if (l) {
- l->batch.next_event = listener_batch_next;
- l->collector = pn_collector();
- l->condition = pn_condition();
- l->attachments = pn_record();
- if (!l->condition || !l->collector || !l->attachments) {
- pn_listener_free(l);
- return NULL;
- }
- }
- return l;
-}
-
-void pn_listener_close(pn_listener_t* l) {
- wakeup(&l->psocket, leader_close);
-}
-
-pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
- return l ? l->psocket.proactor : NULL;
-}
-
-pn_condition_t* pn_listener_condition(pn_listener_t* l) {
- return l->condition;
-}
-
-void *pn_listener_get_context(pn_listener_t *l) {
- return l->context;
-}
-
-void pn_listener_set_context(pn_listener_t *l, void *context) {
- l->context = context;
-}
-
-pn_record_t *pn_listener_attachments(pn_listener_t *l) {
- return l->attachments;
-}
-
-int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
- if (l->accepting) {
- return PN_STATE_ERR; /* Only one at a time */
- }
- l->accepting = new_pconnection_t(
- l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
- if (!l->accepting) {
- return UV_ENOMEM;
- }
- owner_to_leader(&l->psocket, leader_accept);
- return 0;
-}
-
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
deleted file mode 100644
index b8edcd6..0000000
--- a/examples/c/proactor/receive.c
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/connection.h>
-#include <proton/connection_driver.h>
-#include <proton/delivery.h>
-#include <proton/proactor.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-#include <proton/url.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-typedef char str[1024];
-
-typedef struct app_data_t {
- str address;
- str container_id;
- pn_rwbytes_t message_buffer;
- int message_count;
- int received;
- pn_proactor_t *proactor;
- bool finished;
-} app_data_t;
-
-static const int BATCH = 100; /* Batch size for unlimited receive */
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
- if (pn_condition_is_set(cond)) {
- exit_code = 1;
- fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
- pn_condition_get_name(cond), pn_condition_get_description(cond));
- }
-}
-
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
- static char buffer[MAX_SIZE];
- ssize_t len;
- // try to decode the message body
- if (pn_delivery_pending(dlv) < MAX_SIZE) {
- // read in the raw data
- len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
- if (len > 0) {
- // decode it into a proton message
- pn_message_t *m = pn_message();
- if (PN_OK == pn_message_decode(m, buffer, len)) {
- pn_string_t *s = pn_string(NULL);
- pn_inspect(pn_message_body(m), s);
- printf("%s\n", pn_string_get(s));
- pn_free(s);
- }
- pn_message_free(m);
- }
- }
-}
-
-static void handle(app_data_t* app, pn_event_t* event) {
- switch (pn_event_type(event)) {
-
- case PN_CONNECTION_INIT: {
- pn_connection_t* c = pn_event_connection(event);
- pn_connection_set_container(c, app->container_id);
- pn_connection_open(c);
- pn_session_t* s = pn_session(c);
- pn_session_open(s);
- pn_link_t* l = pn_receiver(s, "my_receiver");
- pn_terminus_set_address(pn_link_source(l), app->address);
- pn_link_open(l);
- /* cannot receive without granting credit: */
- pn_link_flow(l, app->message_count ? app->message_count : BATCH);
- } break;
-
- case PN_DELIVERY: {
- /* A message has been received */
- pn_link_t *link = NULL;
- pn_delivery_t *dlv = pn_event_delivery(event);
- if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
- link = pn_delivery_link(dlv);
- decode_message(dlv);
- /* Accept the delivery */
- pn_delivery_update(dlv, PN_ACCEPTED);
- /* done with the delivery, move to the next and free it */
- pn_link_advance(link);
- pn_delivery_settle(dlv); /* dlv is now freed */
-
- if (app->message_count == 0) {
- /* receive forever - see if more credit is needed */
- if (pn_link_credit(link) < BATCH/2) {
- /* Grant enough credit to bring it up to BATCH: */
- pn_link_flow(link, BATCH - pn_link_credit(link));
- }
- } else if (++app->received >= app->message_count) {
- /* done receiving, close the endpoints */
- printf("%d messages received\n", app->received);
- pn_session_t *ssn = pn_link_session(link);
- pn_link_close(link);
- pn_session_close(ssn);
- pn_connection_close(pn_session_connection(ssn));
- }
- }
- } break;
-
- case PN_TRANSPORT_ERROR:
- check_condition(event, pn_transport_condition(pn_event_transport(event)));
- break;
-
- case PN_CONNECTION_REMOTE_CLOSE:
- check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_SESSION_REMOTE_CLOSE:
- check_condition(event, pn_session_remote_condition(pn_event_session(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_LINK_REMOTE_CLOSE:
- case PN_LINK_REMOTE_DETACH:
- check_condition(event, pn_link_remote_condition(pn_event_link(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_PROACTOR_INACTIVE:
- app->finished = true;
- break;
-
- default: break;
- }
-}
-
-static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
- exit(1);
-}
-
-int main(int argc, char **argv) {
- /* Default values for application and connection. */
- app_data_t app = {{0}};
- app.message_count = 100;
- const char* urlstr = NULL;
-
- int opt;
- while((opt = getopt(argc, argv, "a:m:")) != -1) {
- switch(opt) {
- case 'a': urlstr = optarg; break;
- case 'm': app.message_count = atoi(optarg); break;
- default: usage(argv[0]); break;
- }
- }
- if (optind < argc)
- usage(argv[0]);
-
- snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
-
- /* Parse the URL or use default values */
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- const char *host = url ? pn_url_get_host(url) : NULL;
- const char *port = url ? pn_url_get_port(url) : "amqp";
- strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
-
- /* Create the proactor and connect */
- app.proactor = pn_proactor();
- pn_proactor_connect(app.proactor, pn_connection(), host, port);
- if (url) pn_url_free(url);
-
- do {
- pn_event_batch_t *events = pn_proactor_wait(app.proactor);
- pn_event_t *e;
- while ((e = pn_event_batch_next(events))) {
- handle(&app, e);
- }
- pn_proactor_done(app.proactor, events);
- } while(!app.finished);
-
- pn_proactor_free(app.proactor);
- free(app.message_buffer.start);
- return exit_code;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
deleted file mode 100644
index d611b3d..0000000
--- a/examples/c/proactor/send.c
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/connection.h>
-#include <proton/connection_driver.h>
-#include <proton/delivery.h>
-#include <proton/proactor.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-#include <proton/url.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-typedef char str[1024];
-
-typedef struct app_data_t {
- str address;
- str container_id;
- pn_rwbytes_t message_buffer;
- int message_count;
- int sent;
- int acknowledged;
- pn_proactor_t *proactor;
- pn_millis_t delay;
- bool delaying;
- pn_link_t *sender;
- bool finished;
-} app_data_t;
-
-int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
- if (pn_condition_is_set(cond)) {
- exit_code = 1;
- fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
- pn_condition_get_name(cond), pn_condition_get_description(cond));
- }
-}
-
-/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
-static pn_bytes_t encode_message(app_data_t* app) {
- /* Construct a message with the map { "sequence": app.sent } */
- pn_message_t* message = pn_message();
- pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
- pn_data_t* body = pn_message_body(message);
- pn_data_put_map(body);
- pn_data_enter(body);
- pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
- pn_data_put_int(body, app->sent); /* The sequence number */
- pn_data_exit(body);
-
- /* encode the message, expanding the encode buffer as needed */
- if (app->message_buffer.start == NULL) {
- static const size_t initial_size = 128;
- app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
- }
- /* app->message_buffer is the total buffer space available. */
- /* mbuf wil point at just the portion used by the encoded message */
- pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
- int status = 0;
- while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
- app->message_buffer.size *= 2;
- app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
- mbuf.size = app->message_buffer.size;
- }
- if (status != 0) {
- fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
- exit(1);
- }
- pn_message_free(message);
- return pn_bytes(mbuf.size, mbuf.start);
-}
-
-static void send(app_data_t* app) {
- while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) {
- ++app->sent;
- // Use sent counter bytes as unique delivery tag.
- pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
- pn_bytes_t msgbuf = encode_message(app);
- pn_link_send(app->sender, msgbuf.start, msgbuf.size);
- pn_link_advance(app->sender);
- if (app->delay && app->sent < app->message_count) {
- /* If delay is set, wait for TIMEOUT event to send more */
- app->delaying = true;
- pn_proactor_set_timeout(app->proactor, app->delay);
- break;
- }
- }
-}
-
-static void handle(app_data_t* app, pn_event_t* event) {
- switch (pn_event_type(event)) {
-
- case PN_CONNECTION_INIT: {
- pn_connection_t* c = pn_event_connection(event);
- pn_connection_set_container(c, app->container_id);
- pn_connection_open(c);
- pn_session_t* s = pn_session(c);
- pn_session_open(s);
- pn_link_t* l = pn_sender(s, "my_sender");
- pn_terminus_set_address(pn_link_target(l), app->address);
- pn_link_open(l);
- } break;
-
- case PN_LINK_FLOW:
- /* The peer has given us some credit, now we can send messages */
- if (!app->delaying) {
- app->sender = pn_event_link(event);
- send(app);
- }
- break;
-
- case PN_PROACTOR_TIMEOUT:
- /* Wake the sender's connection */
- pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
- break;
-
- case PN_CONNECTION_WAKE:
- /* Timeout, we can send more. */
- app->delaying = false;
- send(app);
- break;
-
- case PN_DELIVERY: {
- /* We received acknowledgedment from the peer that a message was delivered. */
- pn_delivery_t* d = pn_event_delivery(event);
- if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
- if (++app->acknowledged == app->message_count) {
- printf("%d messages sent and acknowledged\n", app->acknowledged);
- pn_connection_close(pn_event_connection(event));
- }
- }
- } break;
-
- case PN_TRANSPORT_CLOSED:
- check_condition(event, pn_transport_condition(pn_event_transport(event)));
- break;
-
- case PN_CONNECTION_REMOTE_CLOSE:
- check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_SESSION_REMOTE_CLOSE:
- check_condition(event, pn_session_remote_condition(pn_event_session(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_LINK_REMOTE_CLOSE:
- case PN_LINK_REMOTE_DETACH:
- check_condition(event, pn_link_remote_condition(pn_event_link(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_PROACTOR_INACTIVE:
- app->finished = true;
- break;
-
- default: break;
- }
-}
-
-static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-a url] [-m message-count] [-d delay-ms]\n", arg0);
- exit(1);
-}
-
-int main(int argc, char **argv) {
- /* Default values for application and connection. */
- app_data_t app = {{0}};
- app.message_count = 100;
- const char* urlstr = NULL;
-
- int opt;
- while((opt = getopt(argc, argv, "a:m:d:")) != -1) {
- switch(opt) {
- case 'a': urlstr = optarg; break;
- case 'm': app.message_count = atoi(optarg); break;
- case 'd': app.delay = atoi(optarg); break;
- default: usage(argv[0]); break;
- }
- }
- if (optind < argc)
- usage(argv[0]);
-
- snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
-
- /* Parse the URL or use default values */
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- const char *host = url ? pn_url_get_host(url) : NULL;
- const char *port = url ? pn_url_get_port(url) : "amqp";
- strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
-
- /* Create the proactor and connect */
- app.proactor = pn_proactor();
- pn_proactor_connect(app.proactor, pn_connection(), host, port);
- if (url) pn_url_free(url);
-
- do {
- pn_event_batch_t *events = pn_proactor_wait(app.proactor);
- pn_event_t *e;
- while ((e = pn_event_batch_next(events))) {
- handle(&app, e);
- }
- pn_proactor_done(app.proactor, events);
- } while(!app.finished);
-
- pn_proactor_free(app.proactor);
- free(app.message_buffer.start);
- return exit_code;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
deleted file mode 100644
index a86425d..0000000
--- a/examples/c/proactor/test.py
+++ /dev/null
@@ -1,60 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License
-#
-
-# This is a test script to run the examples and verify that they behave as expected.
-
-from exampletest import *
-
-import unittest
-import sys
-
-def python_cmd(name):
- dir = os.path.dirname(__file__)
- return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
-
-def receive_expect(n):
- return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
-
-class CExampleTest(BrokerTestCase):
- broker_exe = ["libuv_broker"]
-
- def test_send_receive(self):
- """Send first then receive"""
- s = self.proc(["libuv_send", "-a", self.addr])
- self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
- r = self.proc(["libuv_receive", "-a", self.addr])
- self.assertEqual(receive_expect(100), r.wait_out())
-
- def test_receive_send(self):
- """Start receiving first, then send."""
- r = self.proc(["libuv_receive", "-a", self.addr]);
- s = self.proc(["libuv_send", "-a", self.addr]);
- self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
- self.assertEqual(receive_expect(100), r.wait_out())
-
- def test_timed_send(self):
- """Send with timed delay"""
- s = self.proc(["libuv_send", "-a", self.addr, "-d100", "-m3"])
- self.assertEqual("3 messages sent and acknowledged\n", s.wait_out())
- r = self.proc(["libuv_receive", "-a", self.addr, "-m3"])
- self.assertEqual(receive_expect(3), r.wait_out())
-
-
-if __name__ == "__main__":
- unittest.main()
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/reactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/reactor/CMakeLists.txt b/examples/c/reactor/CMakeLists.txt
deleted file mode 100644
index bd6163f..0000000
--- a/examples/c/reactor/CMakeLists.txt
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED)
-
-set (reactor-examples
- sender.c
- receiver.c
- )
-
-set_source_files_properties (
- ${reactor-examples}
- PROPERTIES
- COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LINK_TIME_OPTIMIZATION}"
- )
-
-if (BUILD_WITH_CXX)
- set_source_files_properties (
- ${reactor-examples}
- PROPERTIES LANGUAGE CXX
- )
-endif (BUILD_WITH_CXX)
-
-include_directories(${Proton_INCLUDE_DIRS})
-add_executable(sender sender.c)
-add_executable(receiver receiver.c)
-target_link_libraries(sender ${Proton_LIBRARIES})
-target_link_libraries(receiver ${Proton_LIBRARIES})
-
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/reactor/README
----------------------------------------------------------------------
diff --git a/examples/c/reactor/README b/examples/c/reactor/README
deleted file mode 100644
index 8d61893..0000000
--- a/examples/c/reactor/README
+++ /dev/null
@@ -1,30 +0,0 @@
-These example clients require a broker or similar intermediary that
-supports the AMQP 1.0 protocol, allows anonymous connections and
-accepts links to and from a node named 'examples'.
-
-------------------------------------------------------------------
-
-sender.c
-
-A simple message sending client. This example sends all messages but
-the last as pre-settled (no ack required). It then pends waiting for
-an ack for the last message sent before exiting.
-
-Use the '-h' command line option for a list of supported parameters.
-
-------------------------------------------------------------------
-
-receiver.c
-
-A simple message consuming client. This example receives messages
-from a target (default 'examples'). Received messages are
-acknowledged if they are sent un-settled. The client will try to
-decode the message payload assuming it has been generated by the
-sender example.
-
-Use the '-h' command line option for a list of supported parameters.
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/reactor/receiver.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c
deleted file mode 100644
index 35c5a70..0000000
--- a/examples/c/reactor/receiver.c
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "pncompat/misc_funcs.inc"
-
-#include "proton/reactor.h"
-#include "proton/message.h"
-#include "proton/connection.h"
-#include "proton/session.h"
-#include "proton/link.h"
-#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
-#include "proton/transport.h"
-#include "proton/url.h"
-
-static int quiet = 0;
-
-// Credit batch if unlimited receive (-c 0)
-static const int CAPACITY = 100;
-#define MAX_SIZE 512
-
-// Example application data. This data will be instantiated in the event
-// handler, and is available during event processing. In this example it
-// holds configuration and state information.
-//
-typedef struct {
- int count; // # of messages to receive before exiting
- const char *source; // name of the source node to receive from
- pn_message_t *message; // holds the received message
-} app_data_t;
-
-// helper to pull pointer to app_data_t instance out of the pn_handler_t
-//
-#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
-
-// Called when reactor exits to clean up app_data
-//
-static void delete_handler(pn_handler_t *handler)
-{
- app_data_t *d = GET_APP_DATA(handler);
- if (d->message) {
- pn_decref(d->message);
- d->message = NULL;
- }
-}
-
-
-/* Process each event posted by the reactor.
- */
-static void event_handler(pn_handler_t *handler,
- pn_event_t *event,
- pn_event_type_t type)
-{
- app_data_t *data = GET_APP_DATA(handler);
-
- switch (type) {
-
- case PN_CONNECTION_INIT: {
- // Create and open all the endpoints needed to send a message
- //
- pn_connection_t *conn;
- pn_session_t *ssn;
- pn_link_t *receiver;
-
- conn = pn_event_connection(event);
- pn_connection_open(conn);
- ssn = pn_session(conn);
- pn_session_open(ssn);
- receiver = pn_receiver(ssn, "MyReceiver");
- pn_terminus_set_address(pn_link_source(receiver), data->source);
- pn_link_open(receiver);
- // cannot receive without granting credit:
- pn_link_flow(receiver, data->count ? data->count : CAPACITY);
- } break;
-
- case PN_DELIVERY: {
- // A message has been received
- //
- pn_link_t *link = NULL;
- pn_delivery_t *dlv = pn_event_delivery(event);
- if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
- // A full message has arrived
- if (!quiet) {
- static char buffer[MAX_SIZE];
- ssize_t len;
- pn_bytes_t bytes;
- bool found = false;
-
- // try to decode the message body
- if (pn_delivery_pending(dlv) < MAX_SIZE) {
- // read in the raw data
- len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
- if (len > 0) {
- // decode it into a proton message
- pn_message_clear(data->message);
- if (PN_OK == pn_message_decode(data->message, buffer,
- len)) {
- // Assuming the message came from the sender
- // example, try to parse out a single string from
- // the payload
- //
- pn_data_scan(pn_message_body(data->message), "?S",
- &found, &bytes);
- }
- }
- }
- if (found) {
- fprintf(stdout, "Message: [%.*s]\n", (int)bytes.size,
- bytes.start);
- } else {
- fprintf(stdout, "Message received!\n");
- }
- }
-
- link = pn_delivery_link(dlv);
-
- if (!pn_delivery_settled(dlv)) {
- // remote has not settled, so it is tracking the delivery. Ack
- // it.
- pn_delivery_update(dlv, PN_ACCEPTED);
- }
-
- // done with the delivery, move to the next and free it
- pn_link_advance(link);
- pn_delivery_settle(dlv); // dlv is now freed
-
- if (data->count == 0) {
- // send forever - see if more credit is needed
- if (pn_link_credit(link) < CAPACITY/2) {
- // Grant enough credit to bring it up to CAPACITY:
- pn_link_flow(link, CAPACITY - pn_link_credit(link));
- }
- } else if (--data->count == 0) {
- // done receiving, close the endpoints
- pn_session_t *ssn = pn_link_session(link);
- pn_link_close(link);
- pn_session_close(ssn);
- pn_connection_close(pn_session_connection(ssn));
- }
- }
- } break;
-
- case PN_TRANSPORT_ERROR: {
- // The connection to the peer failed.
- //
- pn_transport_t *tport = pn_event_transport(event);
- pn_condition_t *cond = pn_transport_condition(tport);
- fprintf(stderr, "Network transport failed!\n");
- if (pn_condition_is_set(cond)) {
- const char *name = pn_condition_get_name(cond);
- const char *desc = pn_condition_get_description(cond);
- fprintf(stderr, " Error: %s Description: %s\n",
- (name) ? name : "<error name not provided>",
- (desc) ? desc : "<no description provided>");
- }
- // pn_reactor_process() will exit with a false return value, stopping
- // the main loop.
- } break;
-
- default:
- break;
- }
-}
-
-static void usage(void)
-{
- printf("Usage: receiver <options>\n");
- printf("-a \tThe host address [localhost:5672]\n");
- printf("-c \t# of messages to receive, 0=receive forever [1]\n");
- printf("-s \tSource address [examples]\n");
- printf("-i \tContainer name [ReceiveExample]\n");
- printf("-q \tQuiet - turn off stdout\n");
- exit(1);
-}
-
-int main(int argc, char** argv)
-{
- const char *address = "localhost";
- const char *container = "ReceiveExample";
- int c;
- pn_reactor_t *reactor = NULL;
- pn_url_t *url = NULL;
- pn_connection_t *conn = NULL;
-
- /* create a handler for the connection's events.
- * event_handler will be called for each event. The handler will allocate
- * a app_data_t instance which can be accessed when the event_handler is
- * called.
- */
- pn_handler_t *handler = pn_handler_new(event_handler,
- sizeof(app_data_t),
- delete_handler);
-
- /* set up the application data with defaults */
- app_data_t *app_data = GET_APP_DATA(handler);
- memset(app_data, 0, sizeof(app_data_t));
- app_data->count = 1;
- app_data->source = "examples";
- app_data->message = pn_message();
-
- /* Attach the pn_handshaker() handler. This handler deals with endpoint
- * events from the peer so we don't have to.
- */
- {
- pn_handler_t *handshaker = pn_handshaker();
- pn_handler_add(handler, handshaker);
- pn_decref(handshaker);
- }
-
- /* command line options */
- opterr = 0;
- while((c = getopt(argc, argv, "i:a:c:s:qh")) != -1) {
- switch(c) {
- case 'h': usage(); break;
- case 'a': address = optarg; break;
- case 'c':
- app_data->count = atoi(optarg);
- if (app_data->count < 0) usage();
- break;
- case 's': app_data->source = optarg; break;
- case 'i': container = optarg; break;
- case 'q': quiet = 1; break;
- default:
- usage();
- break;
- }
- }
-
- reactor = pn_reactor();
-
- url = pn_url_parse(address);
- if (url == NULL) {
- fprintf(stderr, "Invalid host address %s\n", address);
- exit(1);
- }
- conn = pn_reactor_connection_to_host(reactor,
- pn_url_get_host(url),
- pn_url_get_port(url),
- handler);
- pn_decref(url);
- pn_decref(handler);
-
- // the container name should be unique for each client
- pn_connection_set_container(conn, container);
-
- // wait up to 5 seconds for activity before returning from
- // pn_reactor_process()
- pn_reactor_set_timeout(reactor, 5000);
-
- pn_reactor_start(reactor);
-
- while (pn_reactor_process(reactor)) {
- /* Returns 'true' until the connection is shut down.
- * pn_reactor_process() will return true at least once every 5 seconds
- * (due to the timeout). If no timeout was configured,
- * pn_reactor_process() returns as soon as it finishes processing all
- * pending I/O and events. Once the connection has closed,
- * pn_reactor_process() will return false.
- */
- }
- pn_decref(reactor);
-
- return 0;
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org