You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:51:22 UTC
[15/38] qpid-proton git commit: PROTON-1403: c proactor direct example
PROTON-1403: c proactor direct example
direct server that can accept a connection from the send or receive examples and act
as a directly-connected receive or send as appropriate.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6291a75c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6291a75c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6291a75c
Branch: refs/heads/go1
Commit: 6291a75cbed5840578c0d2577424ed574b8ee56f
Parents: 8568737
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Feb 14 12:06:43 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Feb 14 12:51:31 2017 -0500
----------------------------------------------------------------------
examples/c/proactor/CMakeLists.txt | 2 +-
examples/c/proactor/README.dox | 14 +-
examples/c/proactor/broker.c | 3 +-
examples/c/proactor/direct.c | 357 ++++++++++++++++++++++++++++++++
examples/c/proactor/receive.c | 2 +-
examples/c/proactor/test.py | 29 ++-
examples/exampletest.py | 1 +
7 files changed, 398 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
index 7fec1c6..4189cf5 100644
--- a/examples/c/proactor/CMakeLists.txt
+++ b/examples/c/proactor/CMakeLists.txt
@@ -41,7 +41,7 @@ else(WIN32)
set(PLATFORM_LIBS pthread)
endif(WIN32)
-foreach(name broker send receive)
+foreach(name broker send receive direct)
add_executable(proactor-${name} ${name}.c)
target_link_libraries(proactor-${name} ${Proton_LIBRARIES} ${PLATFORM_LIBS})
set_target_properties(proactor-${name} PROPERTIES OUTPUT_NAME ${name})
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox
index 4b09cb7..19083e5 100644
--- a/examples/c/proactor/README.dox
+++ b/examples/c/proactor/README.dox
@@ -2,16 +2,22 @@
* @example send.c
*
* Send a fixed number of messages to the "example" node.
+ * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
*
* @example receive.c
*
- * Subscribes to the 'example' node and prints the message bodies
- * received.
+ * Subscribes to the 'example' node and prints the message bodies received.
+ * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
+ *
+ * @example direct.c
+ *
+ * A server that can be used to demonstrate direct (no broker) peer-to-peer communication
+ * It can accept an incoming connection from either the @ref send.c or @ref receive.c examples
+ * and will act as the directly-connected counterpart (receive or send)
*
* @example broker.c
*
- * A simple multithreaded broker that works with the send and receive
- * examples.
+ * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples.
*
* __Requires C++11__
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index 5679290..ebf4068 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -307,7 +307,7 @@ static void handle(broker_t* b, pn_event_t* e) {
pn_listener_accept(pn_event_listener(e), pn_connection());
break;
- case PN_CONNECTION_INIT:
+ case PN_CONNECTION_INIT:
pn_connection_set_container(c, b->container_id);
break;
@@ -398,6 +398,7 @@ static void handle(broker_t* b, pn_event_t* e) {
case PN_LISTENER_CLOSE:
check_condition(e, pn_listener_condition(pn_event_listener(e)));
+ broker_stop(b);
break;
case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
new file mode 100644
index 0000000..26f1b33
--- /dev/null
+++ b/examples/c/proactor/direct.c
@@ -0,0 +1,357 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/connection_driver.h>
+#include <proton/delivery.h>
+#include <proton/proactor.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/sasl.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+#include "pncompat/misc_funcs.inc"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+typedef char str[1024];
+
+typedef struct app_data_t {
+ /* Common values */
+ pn_proactor_t *proactor;
+ bool finished;
+ str address;
+ str container_id;
+ pn_rwbytes_t message_buffer;
+ int message_count;
+
+ /* Sender values */
+ int sent;
+ int acknowledged;
+ pn_link_t *sender;
+ pn_millis_t delay;
+ bool delaying;
+
+ /* Receiver values */
+ int received;
+} app_data_t;
+
+static const int BATCH = 1000; /* Batch size for unlimited receive */
+
+int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+ if (pn_condition_is_set(cond)) {
+ exit_code = 1;
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
+ }
+}
+
+/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
+static pn_bytes_t encode_message(app_data_t* app) {
+ /* Construct a message with the map { "sequence": app.sent } */
+ pn_message_t* message = pn_message();
+ pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+ pn_data_t* body = pn_message_body(message);
+ pn_data_put_map(body);
+ pn_data_enter(body);
+ pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+ pn_data_put_int(body, app->sent); /* The sequence number */
+ pn_data_exit(body);
+
+ /* encode the message, expanding the encode buffer as needed */
+ if (app->message_buffer.start == NULL) {
+ static const size_t initial_size = 128;
+ app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+ }
+ /* app->message_buffer is the total buffer space available. */
+ /* mbuf wil point at just the portion used by the encoded message */
+ pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+ int status = 0;
+ while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+ app->message_buffer.size *= 2;
+ app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+ mbuf.size = app->message_buffer.size;
+ }
+ if (status != 0) {
+ fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+ exit(1);
+ }
+ pn_message_free(message);
+ return pn_bytes(mbuf.size, mbuf.start);
+}
+
+static void send(app_data_t* app) {
+ while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) {
+ ++app->sent;
+ // Use sent counter bytes as unique delivery tag.
+ pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+ pn_bytes_t msgbuf = encode_message(app);
+ pn_link_send(app->sender, msgbuf.start, msgbuf.size);
+ pn_link_advance(app->sender);
+ if (app->delay && app->sent < app->message_count) {
+ /* If delay is set, wait for TIMEOUT event to send more */
+ app->delaying = true;
+ pn_proactor_set_timeout(app->proactor, app->delay);
+ break;
+ }
+ }
+}
+
+#define MAX_SIZE 1024
+
+static void decode_message(pn_delivery_t *dlv) {
+ static char buffer[MAX_SIZE];
+ ssize_t len;
+ // try to decode the message body
+ if (pn_delivery_pending(dlv) < MAX_SIZE) {
+ // read in the raw data
+ len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
+ if (len > 0) {
+ // decode it into a proton message
+ pn_message_t *m = pn_message();
+ if (PN_OK == pn_message_decode(m, buffer, len)) {
+ pn_string_t *s = pn_string(NULL);
+ pn_inspect(pn_message_body(m), s);
+ printf("%s\n", pn_string_get(s));
+ pn_free(s);
+ }
+ pn_message_free(m);
+ }
+ }
+}
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_LINK_REMOTE_OPEN: {
+ pn_link_t *l = pn_event_link(event);
+ pn_link_open(l);
+ pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+ } break;
+
+ case PN_DELIVERY: {
+ /* A message has been received */
+ pn_link_t *link = NULL;
+ pn_delivery_t *dlv = pn_event_delivery(event);
+ if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+ link = pn_delivery_link(dlv);
+ decode_message(dlv);
+ /* Accept the delivery */
+ pn_delivery_update(dlv, PN_ACCEPTED);
+ /* done with the delivery, move to the next and free it */
+ pn_link_advance(link);
+ pn_delivery_settle(dlv); /* dlv is now freed */
+
+ if (app->message_count == 0) {
+ /* receive forever - see if more credit is needed */
+ if (pn_link_credit(link) < BATCH/2) {
+ /* Grant enough credit to bring it up to BATCH: */
+ pn_link_flow(link, BATCH - pn_link_credit(link));
+ }
+ } else if (++app->received >= app->message_count) {
+ /* done receiving, close the endpoints */
+ printf("%d messages received\n", app->received);
+ pn_session_t *ssn = pn_link_session(link);
+ pn_link_close(link);
+ pn_session_close(ssn);
+ pn_connection_close(pn_session_connection(ssn));
+ }
+ }
+ } break;
+
+ default:
+ break;
+ }
+}
+
+/* This function handles events when we are acting as the sender */
+static void handle_send(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_LINK_REMOTE_OPEN: {
+ pn_link_t* l = pn_event_link(event);
+ pn_terminus_set_address(pn_link_target(l), app->address);
+ pn_link_open(l);
+ } break;
+
+ case PN_LINK_FLOW:
+ /* The peer has given us some credit, now we can send messages */
+ if (!app->delaying) {
+ app->sender = pn_event_link(event);
+ send(app);
+ } break;
+
+ case PN_DELIVERY: {
+ /* We received acknowledgedment from the peer that a message was delivered. */
+ pn_delivery_t* d = pn_event_delivery(event);
+ if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+ if (++app->acknowledged == app->message_count) {
+ printf("%d messages sent and acknowledged\n", app->acknowledged);
+ pn_connection_close(pn_event_connection(event));
+ }
+ }
+ } break;
+
+ default:
+ break;
+ }
+}
+
+/* Handle all events, delegate to handle_send or handle_receive depending on link mode */
+static void handle(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_LISTENER_ACCEPT:
+ pn_listener_accept(pn_event_listener(event), pn_connection());
+ break;
+
+ case PN_CONNECTION_INIT:
+ pn_connection_set_container(pn_event_connection(event), app->container_id);
+ break;
+
+ case PN_CONNECTION_BOUND: {
+ /* Turn off security */
+ pn_transport_t *t = pn_event_transport(event);
+ pn_transport_require_auth(t, false);
+ pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
+ }
+ case PN_CONNECTION_REMOTE_OPEN: {
+ pn_connection_open(pn_event_connection(event)); /* Complete the open */
+ break;
+ }
+
+ case PN_SESSION_REMOTE_OPEN: {
+ pn_session_open(pn_event_session(event));
+ break;
+ }
+
+ case PN_TRANSPORT_CLOSED:
+ check_condition(event, pn_transport_condition(pn_event_transport(event)));
+ app->finished = true;
+ break;
+
+ case PN_CONNECTION_REMOTE_CLOSE:
+ check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_SESSION_REMOTE_CLOSE:
+ check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_LINK_REMOTE_CLOSE:
+ case PN_LINK_REMOTE_DETACH:
+ check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_PROACTOR_TIMEOUT:
+ /* Wake the sender's connection */
+ pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
+ break;
+
+ case PN_CONNECTION_WAKE:
+ /* Timeout, we can send more. */
+ app->delaying = false;
+ send(app);
+ break;
+
+ case PN_PROACTOR_INACTIVE:
+ app->finished = true;
+ break;
+
+ case PN_LISTENER_CLOSE:
+ check_condition(event, pn_listener_condition(pn_event_listener(event)));
+ app->finished = true;
+ break;
+
+ default: {
+ pn_link_t *l = pn_event_link(event);
+ if (l) { /* Only delegate link-related events */
+ if (pn_link_is_sender(l)) {
+ handle_send(app, event);
+ } else {
+ handle_receive(app, event);
+ }
+ }
+ }
+ }
+}
+
+static void usage(const char *arg0) {
+ fprintf(stderr, "Usage: %s [-a URL] [-m message-count] [-d delay-ms]\n", arg0);
+ fprintf(stderr, "Demonstrates direct peer-to-peer AMQP communication without a broker. Accepts a connection from either the send.c or receive.c client and provides the complementary behavior (receive or send.");
+ exit(1);
+}
+
+int main(int argc, char **argv) {
+ /* Default values for application and connection. */
+ app_data_t app = {0};
+ app.message_count = 100;
+ const char* urlstr = NULL;
+
+ int opt;
+ while((opt = getopt(argc, argv, "a:m:d:")) != -1) {
+ switch(opt) {
+ case 'a': urlstr = optarg; break;
+ case 'm': app.message_count = atoi(optarg); break;
+ case 'd': app.delay = atoi(optarg); break;
+ default: usage(argv[0]); break;
+ }
+ }
+ if (optind < argc)
+ usage(argv[0]);
+ /* Note container-id should be unique */
+ snprintf(app.container_id, sizeof(app.container_id), "%s", argv[0]);
+
+ /* Parse the URL or use default values */
+ pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+ /* Listen on IPv6 wildcard. On systems that do not set IPV6ONLY by default,
+ this will also listen for mapped IPv4 on the same port.
+ */
+ const char *host = url ? pn_url_get_host(url) : "::";
+ const char *port = url ? pn_url_get_port(url) : "amqp";
+
+ app.proactor = pn_proactor();
+ pn_proactor_listen(app.proactor, pn_listener(), host, port, 16);
+ printf("listening on '%s:%s'\n", host, port);
+ if (url) pn_url_free(url);
+
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app.proactor);
+ pn_event_t *e;
+ while ((e = pn_event_batch_next(events))) {
+ handle(&app, e);
+ }
+ pn_proactor_done(app.proactor, events);
+ } while(!app.finished);
+
+ pn_proactor_free(app.proactor);
+ free(app.message_buffer.start);
+ return exit_code;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index 1eb54b6..1bc5509 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -46,7 +46,7 @@ typedef struct app_data_t {
bool finished;
} app_data_t;
-static const int BATCH = 100; /* Batch size for unlimited receive */
+static const int BATCH = 1000; /* Batch size for unlimited receive */
static int exit_code = 0;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index 29aa327..45bb817 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -19,11 +19,9 @@
# This is a test script to run the examples and verify that they behave as expected.
+import unittest, sys, time
from exampletest import *
-import unittest
-import sys
-
def python_cmd(name):
dir = os.path.dirname(__file__)
return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
@@ -55,6 +53,31 @@ class CExampleTest(BrokerTestCase):
r = self.proc(["receive", "-a", self.addr, "-m3"])
self.assertEqual(receive_expect(3), r.wait_out())
+ def retry(self, args, max=10):
+ """Run until output does not contain "connection refused", up to max retries"""
+ while True:
+ try:
+ return self.proc(args).wait_out()
+ except ProcError, e:
+ if "connection refused" in e.args[0] and max > 0:
+ max -= 1
+ time.sleep(.01)
+ continue
+ raise
+
+ def test_send_direct(self):
+ """Send first then receive"""
+ addr = "127.0.0.1:%s/examples" % (pick_port())
+ d = self.proc(["direct", "-a", addr])
+ self.assertEqual("100 messages sent and acknowledged\n", self.retry(["send", "-a", addr]))
+ self.assertIn(receive_expect(100), d.wait_out())
+
+ def test_receive_direct(self):
+ """Send first then receive"""
+ addr = "127.0.0.1:%s/examples" % (pick_port())
+ d = self.proc(["direct", "-a", addr])
+ self.assertEqual(receive_expect(100), self.retry(["receive", "-a", addr]))
+ self.assertIn("100 messages sent and acknowledged\n", d.wait_out())
if __name__ == "__main__":
unittest.main()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
index d40b9cb..bf7ea9b 100644
--- a/examples/exampletest.py
+++ b/examples/exampletest.py
@@ -65,6 +65,7 @@ class Proc(Popen):
"""Start an example process"""
args = list(args)
self.args = args
+ self.kwargs = kwargs
self._out = os.tmpfile()
try:
Popen.__init__(self, self.env_args + self.args, stdout=self._out, stderr=STDOUT, **kwargs)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org