You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2016/04/12 15:55:35 UTC
qpid-proton git commit: NO-JIRA: Add reactor clients to C examples
Repository: qpid-proton
Updated Branches:
refs/heads/master 9d88f823c -> 6ea002d27
NO-JIRA: Add reactor clients to C examples
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6ea002d2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6ea002d2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6ea002d2
Branch: refs/heads/master
Commit: 6ea002d274692ac85e645041a988512c39c75c05
Parents: 9d88f82
Author: Ken Giusti <kg...@apache.org>
Authored: Mon Apr 11 11:42:06 2016 -0400
Committer: Ken Giusti <kg...@apache.org>
Committed: Tue Apr 12 09:54:57 2016 -0400
----------------------------------------------------------------------
examples/c/CMakeLists.txt | 1 +
examples/c/reactor/CMakeLists.txt | 27 ++++
examples/c/reactor/README | 30 ++++
examples/c/reactor/receiver.c | 237 +++++++++++++++++++++++++++
examples/c/reactor/sender.c | 286 +++++++++++++++++++++++++++++++++
5 files changed, 581 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea002d2/examples/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt
index 2f74094..1612a86 100644
--- a/examples/c/CMakeLists.txt
+++ b/examples/c/CMakeLists.txt
@@ -19,3 +19,4 @@
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
add_subdirectory(messenger)
+add_subdirectory(reactor)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea002d2/examples/c/reactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/reactor/CMakeLists.txt b/examples/c/reactor/CMakeLists.txt
new file mode 100644
index 0000000..99e5b4a
--- /dev/null
+++ b/examples/c/reactor/CMakeLists.txt
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+find_package(Proton REQUIRED)
+
+include_directories(${Proton_INCLUDE_DIRS})
+add_executable(sender sender.c)
+add_executable(receiver receiver.c)
+target_link_libraries(sender ${Proton_LIBRARIES})
+target_link_libraries(receiver ${Proton_LIBRARIES})
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea002d2/examples/c/reactor/README
----------------------------------------------------------------------
diff --git a/examples/c/reactor/README b/examples/c/reactor/README
new file mode 100644
index 0000000..8d61893
--- /dev/null
+++ b/examples/c/reactor/README
@@ -0,0 +1,30 @@
+These example clients require a broker or similar intermediary that
+supports the AMQP 1.0 protocol, allows anonymous connections and
+accepts links to and from a node named 'examples'.
+
+------------------------------------------------------------------
+
+sender.c
+
+A simple message sending client. This example sends all messages but
+the last as pre-settled (no ack required). It then pends waiting for
+an ack for the last message sent before exiting.
+
+Use the '-h' command line option for a list of supported parameters.
+
+------------------------------------------------------------------
+
+receiver.c
+
+A simple message consuming client. This example receives messages
+from a target (default 'examples'). Received messages are
+acknowledged if they are sent un-settled. The client will try to
+decode the message payload assuming it has been generated by the
+sender example.
+
+Use the '-h' command line option for a list of supported parameters.
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea002d2/examples/c/reactor/receiver.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c
new file mode 100644
index 0000000..dd6a9e3
--- /dev/null
+++ b/examples/c/reactor/receiver.c
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "pncompat/misc_funcs.inc"
+
+#include "proton/reactor.h"
+#include "proton/message.h"
+#include "proton/connection.h"
+#include "proton/session.h"
+#include "proton/link.h"
+#include "proton/delivery.h"
+#include "proton/event.h"
+#include "proton/handlers.h"
+
+static int quiet = 0;
+
+// Credit batch if unlimited receive (-c 0)
+static const int CAPACITY = 100;
+#define MAX_SIZE 512
+
+// Example application data. This data will be instantiated in the event
+// handler, and is available during event processing. In this example it
+// holds configuration and state information.
+//
+typedef struct {
+ int count; // # of messages to receive before exiting
+ char *source; // name of the source node to receive from
+ pn_message_t *message; // holds the received message
+} app_data_t;
+
+// helper to pull pointer to app_data_t instance out of the pn_handler_t
+//
+#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
+
+// Called when reactor exits to clean up app_data
+//
+static void delete_handler(pn_handler_t *handler)
+{
+ app_data_t *d = GET_APP_DATA(handler);
+ if (d->message) {
+ pn_decref(d->message);
+ d->message = NULL;
+ }
+}
+
+
+/* Process each event posted by the reactor.
+ */
+static void event_handler(pn_handler_t *handler,
+ pn_event_t *event,
+ pn_event_type_t type)
+{
+ app_data_t *data = GET_APP_DATA(handler);
+
+ switch (type) {
+
+ case PN_CONNECTION_INIT: {
+ // Create and open all the endpoints needed to send a message
+ //
+ pn_connection_t *conn = pn_event_connection(event);
+ pn_connection_open(conn);
+ pn_session_t *ssn = pn_session(conn);
+ pn_session_open(ssn);
+ pn_link_t *receiver = pn_receiver(ssn, "MyReceiver");
+ pn_terminus_set_address(pn_link_source(receiver), data->source);
+ pn_link_open(receiver);
+ // cannot receive without granting credit:
+ pn_link_flow(receiver, data->count ? data->count : CAPACITY);
+ } break;
+
+ case PN_DELIVERY: {
+ // A message has been received
+ //
+ pn_delivery_t *dlv = pn_event_delivery(event);
+ if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+ // A full message has arrived
+ if (!quiet && pn_delivery_pending(dlv) < MAX_SIZE) {
+ // try to decode the message body
+ pn_bytes_t bytes;
+ bool found = false;
+ static char buffer[MAX_SIZE];
+ size_t len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
+ pn_message_clear(data->message);
+ // decode the raw data into the message instance
+ if (pn_message_decode(data->message, buffer, len) == PN_OK) {
+ // Assuming the message came from the sender example, try
+ // to parse out a single string from the payload
+ //
+ int rc = pn_data_scan(pn_message_body(data->message)
+ , "?S", &found, &bytes);
+ if (!rc && found) {
+ fprintf(stdout, "Message: [%.*s]\n",
+ (int)bytes.size, bytes.start);
+ } else {
+ fprintf(stdout, "Message received!\n");
+ }
+ } else {
+ fprintf(stdout, "Message received!\n");
+ }
+ }
+
+ pn_link_t *link = pn_delivery_link(dlv);
+
+ if (!pn_delivery_settled(dlv)) {
+ // remote has not settled, so it is tracking the delivery. Ack
+ // it.
+ pn_delivery_update(dlv, PN_ACCEPTED);
+ }
+
+ // done with the delivery, move to the next and free it
+ pn_link_advance(link);
+ pn_delivery_settle(dlv); // dlv is now freed
+
+ if (data->count == 0) {
+ // send forever - see if more credit is needed
+ if (pn_link_credit(link) < CAPACITY/2) {
+ // Grant enough credit to bring it up to CAPACITY:
+ pn_link_flow(link, CAPACITY - pn_link_credit(link));
+ }
+ } else if (--data->count == 0) {
+ // done receiving, close the endpoints
+ pn_link_close(link);
+ pn_session_t *ssn = pn_link_session(link);
+ pn_session_close(ssn);
+ pn_connection_close(pn_session_connection(ssn));
+ }
+ }
+ } break;
+
+ default:
+ break;
+ }
+}
+
+static void usage(void)
+{
+ printf("Usage: receiver <options>\n");
+ printf("-a \tThe host address [localhost:5672]\n");
+ printf("-c \t# of messages to receive, 0=receive forever [1]\n");
+ printf("-s \tSource address [examples]\n");
+ printf("-i \tContainer name [ReceiveExample]\n");
+ printf("-q \tQuiet - turn off stdout\n");
+ exit(1);
+}
+
+int main(int argc, char** argv)
+{
+ char *address = "localhost";
+ char *container = "ReceiveExample";
+
+ /* create a handler for the connection's events.
+ * event_handler will be called for each event. The handler will allocate
+ * a app_data_t instance which can be accessed when the event_handler is
+ * called.
+ */
+ pn_handler_t *handler = pn_handler_new(event_handler,
+ sizeof(app_data_t),
+ delete_handler);
+
+ /* set up the application data with defaults */
+ app_data_t *app_data = GET_APP_DATA(handler);
+ memset(app_data, 0, sizeof(app_data_t));
+ app_data->count = 1;
+ app_data->source = "examples";
+ app_data->message = pn_message();
+
+ /* Attach the pn_handshaker() handler. This handler deals with endpoint
+ * events from the peer so we don't have to.
+ */
+ pn_handler_add(handler, pn_handshaker());
+
+ /* command line options */
+ opterr = 0;
+ int c;
+ while((c = getopt(argc, argv, "i:a:c:s:qh")) != -1) {
+ switch(c) {
+ case 'h': usage(); break;
+ case 'a': address = optarg; break;
+ case 'c':
+ app_data->count = atoi(optarg);
+ if (app_data->count < 0) usage();
+ break;
+ case 's': app_data->source = optarg; break;
+ case 'i': container = optarg; break;
+ case 'q': quiet = 1; break;
+ default:
+ usage();
+ break;
+ }
+ }
+
+ pn_reactor_t *reactor = pn_reactor();
+ pn_connection_t *conn = pn_reactor_connection(reactor, handler);
+
+ // the container name should be unique for each client
+ pn_connection_set_container(conn, container);
+ pn_connection_set_hostname(conn, address); // FIXME
+
+ // wait up to 5 seconds for activity before returning from
+ // pn_reactor_process()
+ pn_reactor_set_timeout(reactor, 5000);
+
+ pn_reactor_start(reactor);
+
+ while (pn_reactor_process(reactor)) {
+ /* Returns 'true' until the connection is shut down.
+ * pn_reactor_process() will return true at least once every 5 seconds
+ * (due to the timeout). If no timeout was configured,
+ * pn_reactor_process() returns as soon as it finishes processing all
+ * pending I/O and events. Once the connection has closed,
+ * pn_reactor_process() will return false.
+ */
+ }
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea002d2/examples/c/reactor/sender.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/sender.c b/examples/c/reactor/sender.c
new file mode 100644
index 0000000..9a07c9e
--- /dev/null
+++ b/examples/c/reactor/sender.c
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "pncompat/misc_funcs.inc"
+
+#include "proton/reactor.h"
+#include "proton/message.h"
+#include "proton/connection.h"
+#include "proton/session.h"
+#include "proton/link.h"
+#include "proton/delivery.h"
+#include "proton/event.h"
+#include "proton/handlers.h"
+
+static int quiet = 0;
+
+// Example application data. This data will be instantiated in the event
+// handler, and is available during event processing. In this example it
+// holds configuration and state information.
+//
+typedef struct {
+ int count; // # messages to send
+ int anon; // use anonymous link if true
+ char *target; // name of destination target
+ char *msg_data; // pre-encoded outbound message
+ int msg_len; // bytes in msg_data
+} app_data_t;
+
+// helper to pull pointer to app_data_t instance out of the pn_handler_t
+//
+#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
+
+// Called when reactor exits to clean up app_data
+//
+static void delete_handler(pn_handler_t *handler)
+{
+ app_data_t *d = GET_APP_DATA(handler);
+ if (d->msg_data) {
+ free(d->msg_data);
+ d->msg_data = NULL;
+ }
+}
+
+/* Process each event posted by the reactor.
+ */
+static void event_handler(pn_handler_t *handler,
+ pn_event_t *event,
+ pn_event_type_t type)
+{
+ app_data_t *data = GET_APP_DATA(handler);
+
+ switch (type) {
+
+ case PN_CONNECTION_INIT: {
+ // Create and open all the endpoints needed to send a message
+ //
+ pn_connection_t *conn = pn_event_connection(event);
+ pn_connection_open(conn);
+ pn_session_t *ssn = pn_session(conn);
+ pn_session_open(ssn);
+ pn_link_t *sender = pn_sender(ssn, "MySender");
+ // we do not wait for ack until the last message
+ pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
+ if (!data->anon) {
+ pn_terminus_set_address(pn_link_target(sender), data->target);
+ }
+ pn_link_open(sender);
+ } break;
+
+ case PN_LINK_FLOW: {
+ // the remote has given us some credit, now we can send messages
+ //
+ static long tag = 0; // a simple tag generator
+ pn_link_t *sender = pn_event_link(event);
+ int credit = pn_link_credit(sender);
+ while (credit > 0 && data->count > 0) {
+ --credit;
+ --data->count;
+ ++tag;
+ pn_delivery_t *delivery;
+ delivery = pn_delivery(sender,
+ pn_dtag((const char *)&tag, sizeof(tag)));
+ pn_link_send(sender, data->msg_data, data->msg_len);
+ pn_link_advance(sender);
+ if (data->count > 0) {
+ // send pre-settled until the last one, then wait for an ack on
+ // the last sent message. This allows the sender to send
+ // messages as fast as possible and then exit when the consumer
+ // has dealt with the last one.
+ //
+ pn_delivery_settle(delivery);
+ }
+ }
+ } break;
+
+ case PN_DELIVERY: {
+ // Since the example sends all messages but the last pre-settled
+ // (pre-acked), only the last message's delivery will get updated with
+ // the remote state (acked/nacked).
+ //
+ pn_delivery_t *dlv = pn_event_delivery(event);
+ if (pn_delivery_updated(dlv) && pn_delivery_remote_state(dlv)) {
+ uint64_t rs = pn_delivery_remote_state(dlv);
+ int done = 1;
+ switch (rs) {
+ case PN_RECEIVED:
+ // This is not a terminal state - it is informational, and the
+ // peer is still processing the message.
+ done = 0;
+ break;
+ case PN_ACCEPTED:
+ pn_delivery_settle(dlv);
+ if (!quiet) fprintf(stdout, "Send complete!\n");
+ break;
+ case PN_REJECTED:
+ case PN_RELEASED:
+ case PN_MODIFIED:
+ pn_delivery_settle(dlv);
+ fprintf(stderr, "Message not accepted - code:%lu\n", (unsigned long)rs);
+ break;
+ default:
+ // ??? no other terminal states defined, so ignore anything else
+ pn_delivery_settle(dlv);
+ fprintf(stderr, "Unknown delivery failure - code=%lu\n", (unsigned long)rs);
+ break;
+ }
+
+ if (done) {
+ // initiate clean shutdown of the endpoints
+ pn_link_t *link = pn_delivery_link(dlv);
+ pn_link_close(link);
+ pn_session_t *ssn = pn_link_session(link);
+ pn_session_close(ssn);
+ pn_connection_close(pn_session_connection(ssn));
+ }
+ }
+ } break;
+
+ default:
+ break;
+ }
+}
+
+static void usage(void)
+{
+ printf("Usage: send <options> <message>\n");
+ printf("-a \tThe host address [localhost:5672]\n");
+ printf("-c \t# of messages to send [1]\n");
+ printf("-t \tTarget address [examples]\n");
+ printf("-n \tUse an anonymous link [off]\n");
+ printf("-i \tContainer name [SendExample]\n");
+ printf("-q \tQuiet - turn off stdout\n");
+ printf("message \tA text string to send.\n");
+ exit(1);
+}
+
+int main(int argc, char** argv)
+{
+ char *address = "localhost";
+ char *msgtext = "Hello World!";
+ char *container = "SendExample";
+ int anon = 0;
+ int c;
+
+ /* Create a handler for the connection's events. event_handler() will be
+ * called for each event and delete_handler will be called when the
+ * connection is released. The handler will allocate an app_data_t
+ * instance which can be accessed when the event_handler is called.
+ */
+ pn_handler_t *handler = pn_handler_new(event_handler,
+ sizeof(app_data_t),
+ delete_handler);
+
+ /* set up the application data with defaults */
+ app_data_t *app_data = GET_APP_DATA(handler);
+ memset(app_data, 0, sizeof(app_data_t));
+ app_data->count = 1;
+ app_data->target = "examples";
+
+ /* Attach the pn_handshaker() handler. This handler deals with endpoint
+ * events from the peer so we don't have to.
+ */
+ pn_handler_add(handler, pn_handshaker());
+
+ /* command line options */
+ opterr = 0;
+ while((c = getopt(argc, argv, "i:a:c:t:nhq")) != -1) {
+ switch(c) {
+ case 'h': usage(); break;
+ case 'a': address = optarg; break;
+ case 'c':
+ app_data->count = atoi(optarg);
+ if (app_data->count < 1) usage();
+ break;
+ case 't': app_data->target = optarg; break;
+ case 'n': app_data->anon = 1; break;
+ case 'i': container = optarg; break;
+ case 'q': quiet = 1; break;
+ default:
+ usage();
+ break;
+ }
+ }
+ if (optind < argc) msgtext = argv[optind];
+
+
+ // create a single message and pre-encode it so we only have to do that
+ // once. All transmits will use the same pre-encoded message simply for
+ // speed.
+ //
+ pn_message_t *message = pn_message();
+ pn_message_set_address(message, app_data->target);
+ pn_data_t *body = pn_message_body(message);
+ pn_data_clear(body);
+
+ // This message's body contains a single string
+ if (pn_data_fill(body, "S", msgtext)) {
+ fprintf(stderr, "Error building message!\n");
+ exit(1);
+ }
+ pn_data_rewind(body);
+ {
+ // encode the message, expanding the encode buffer as needed
+ //
+ size_t len = 128;
+ char *buf = (char *)malloc(len);
+ int rc = 0;
+ do {
+ rc = pn_message_encode(message, buf, &len);
+ if (rc == PN_OVERFLOW) {
+ free(buf);
+ len *= 2;
+ buf = malloc(len);
+ }
+ } while (rc == PN_OVERFLOW);
+ app_data->msg_len = len;
+ app_data->msg_data = buf;
+ }
+ pn_decref(message); // message no longer needed
+
+ pn_reactor_t *reactor = pn_reactor();
+ pn_connection_t *conn = pn_reactor_connection(reactor, handler);
+
+ // the container name should be unique for each client
+ pn_connection_set_container(conn, container);
+ pn_connection_set_hostname(conn, address); // FIXME
+
+ // wait up to 5 seconds for activity before returning from
+ // pn_reactor_process()
+ pn_reactor_set_timeout(reactor, 5000);
+
+ pn_reactor_start(reactor);
+
+ while (pn_reactor_process(reactor)) {
+ /* Returns 'true' until the connection is shut down.
+ * pn_reactor_process() will return true at least once every 5 seconds
+ * (due to the timeout). If no timeout was configured,
+ * pn_reactor_process() returns as soon as it finishes processing all
+ * pending I/O and events. Once the connection has closed,
+ * pn_reactor_process() will return false.
+ */
+ }
+
+ return 0;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org