You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jr...@apache.org on 2017/09/11 23:18:16 UTC
[1/2] qpid-proton git commit: PROTON-1585: Remove old reactor and
messenger examples; promote the proactor examples to the top level
Repository: qpid-proton
Updated Branches:
refs/heads/master 6888ab5e5 -> 564e0ca4c
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
deleted file mode 100644
index 6fd74a5..0000000
--- a/examples/c/proactor/receive.c
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/connection.h>
-#include <proton/condition.h>
-#include <proton/delivery.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/proactor.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-typedef struct app_data_t {
- const char *host, *port;
- const char *amqp_address;
- const char *container_id;
- int message_count;
-
- pn_proactor_t *proactor;
- int received;
- bool finished;
-} app_data_t;
-
-static const int BATCH = 1000; /* Batch size for unlimited receive */
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
- if (pn_condition_is_set(cond)) {
- fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
- pn_condition_get_name(cond), pn_condition_get_description(cond));
- pn_connection_close(pn_event_connection(e));
- exit_code = 1;
- }
-}
-
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
- static char buffer[MAX_SIZE];
- ssize_t len;
- // try to decode the message body
- if (pn_delivery_pending(dlv) < MAX_SIZE) {
- // read in the raw data
- len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
- if (len > 0) {
- // decode it into a proton message
- pn_message_t *m = pn_message();
- if (PN_OK == pn_message_decode(m, buffer, len)) {
- pn_string_t *s = pn_string(NULL);
- pn_inspect(pn_message_body(m), s);
- printf("%s\n", pn_string_get(s));
- pn_free(s);
- }
- pn_message_free(m);
- }
- }
-}
-
-/* Return true to continue, false to exit */
-static bool handle(app_data_t* app, pn_event_t* event) {
- switch (pn_event_type(event)) {
-
- case PN_CONNECTION_INIT: {
- pn_connection_t* c = pn_event_connection(event);
- pn_connection_set_container(c, app->container_id);
- pn_connection_open(c);
- pn_session_t* s = pn_session(c);
- pn_session_open(s);
- pn_link_t* l = pn_receiver(s, "my_receiver");
- pn_terminus_set_address(pn_link_source(l), app->amqp_address);
- pn_link_open(l);
- /* cannot receive without granting credit: */
- pn_link_flow(l, app->message_count ? app->message_count : BATCH);
- } break;
-
- case PN_DELIVERY: {
- /* A message has been received */
- pn_link_t *link = NULL;
- pn_delivery_t *dlv = pn_event_delivery(event);
- if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
- link = pn_delivery_link(dlv);
- decode_message(dlv);
- /* Accept the delivery */
- pn_delivery_update(dlv, PN_ACCEPTED);
- /* done with the delivery, move to the next and free it */
- pn_link_advance(link);
- pn_delivery_settle(dlv); /* dlv is now freed */
-
- if (app->message_count == 0) {
- /* receive forever - see if more credit is needed */
- if (pn_link_credit(link) < BATCH/2) {
- /* Grant enough credit to bring it up to BATCH: */
- pn_link_flow(link, BATCH - pn_link_credit(link));
- }
- } else if (++app->received >= app->message_count) {
- /* done receiving, close the endpoints */
- printf("%d messages received\n", app->received);
- pn_session_t *ssn = pn_link_session(link);
- pn_link_close(link);
- pn_session_close(ssn);
- pn_connection_close(pn_session_connection(ssn));
- }
- }
- } break;
-
- case PN_TRANSPORT_CLOSED:
- check_condition(event, pn_transport_condition(pn_event_transport(event)));
- break;
-
- case PN_CONNECTION_REMOTE_CLOSE:
- check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_SESSION_REMOTE_CLOSE:
- check_condition(event, pn_session_remote_condition(pn_event_session(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_LINK_REMOTE_CLOSE:
- case PN_LINK_REMOTE_DETACH:
- check_condition(event, pn_link_remote_condition(pn_event_link(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_PROACTOR_INACTIVE:
- return false;
- break;
-
- default:
- break;
- }
- return true;
-}
-
-void run(app_data_t *app) {
- /* Loop and handle events */
- do {
- pn_event_batch_t *events = pn_proactor_wait(app->proactor);
- for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
- if (!handle(app, e)) {
- return;
- }
- }
- pn_proactor_done(app->proactor, events);
- } while(true);
-}
-
-int main(int argc, char **argv) {
- struct app_data_t app = {0};
- int i = 0;
- app.container_id = argv[i++]; /* Should be unique */
- app.host = (argc > 1) ? argv[i++] : "";
- app.port = (argc > 1) ? argv[i++] : "amqp";
- app.amqp_address = (argc > i) ? argv[i++] : "examples";
- app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
-
- /* Create the proactor and connect */
- app.proactor = pn_proactor();
- char addr[PN_MAX_ADDR];
- pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
- pn_proactor_connect(app.proactor, pn_connection(), addr);
- run(&app);
- pn_proactor_free(app.proactor);
- return exit_code;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
deleted file mode 100644
index 43da8b0..0000000
--- a/examples/c/proactor/send.c
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/connection.h>
-#include <proton/condition.h>
-#include <proton/delivery.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/proactor.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-typedef struct app_data_t {
- const char *host, *port;
- const char *amqp_address;
- const char *container_id;
- int message_count;
-
- pn_proactor_t *proactor;
- pn_rwbytes_t message_buffer;
- int sent;
- int acknowledged;
-} app_data_t;
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
- if (pn_condition_is_set(cond)) {
- fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
- pn_condition_get_name(cond), pn_condition_get_description(cond));
- pn_connection_close(pn_event_connection(e));
- exit_code = 1;
- }
-}
-
-/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
-static pn_bytes_t encode_message(app_data_t* app) {
- /* Construct a message with the map { "sequence": app.sent } */
- pn_message_t* message = pn_message();
- pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
- pn_data_t* body = pn_message_body(message);
- pn_data_put_map(body);
- pn_data_enter(body);
- pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
- pn_data_put_int(body, app->sent); /* The sequence number */
- pn_data_exit(body);
-
- /* encode the message, expanding the encode buffer as needed */
- if (app->message_buffer.start == NULL) {
- static const size_t initial_size = 128;
- app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
- }
- /* app->message_buffer is the total buffer space available. */
- /* mbuf wil point at just the portion used by the encoded message */
- pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
- int status = 0;
- while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
- app->message_buffer.size *= 2;
- app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
- mbuf.size = app->message_buffer.size;
- }
- if (status != 0) {
- fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
- exit(1);
- }
- pn_message_free(message);
- return pn_bytes(mbuf.size, mbuf.start);
-}
-
-/* Returns true to continue, false if finished */
-static bool handle(app_data_t* app, pn_event_t* event) {
- switch (pn_event_type(event)) {
-
- case PN_CONNECTION_INIT: {
- pn_connection_t* c = pn_event_connection(event);
- pn_connection_set_container(c, app->container_id);
- pn_connection_open(c);
- pn_session_t* s = pn_session(pn_event_connection(event));
- pn_session_open(s);
- pn_link_t* l = pn_sender(s, "my_sender");
- pn_terminus_set_address(pn_link_target(l), app->amqp_address);
- pn_link_open(l);
- break;
- }
-
- case PN_LINK_FLOW: {
- /* The peer has given us some credit, now we can send messages */
- pn_link_t *sender = pn_event_link(event);
- while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
- ++app->sent;
- // Use sent counter as unique delivery tag.
- pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
- pn_bytes_t msgbuf = encode_message(app);
- pn_link_send(sender, msgbuf.start, msgbuf.size);
- pn_link_advance(sender);
- }
- break;
- }
-
- case PN_DELIVERY: {
- /* We received acknowledgedment from the peer that a message was delivered. */
- pn_delivery_t* d = pn_event_delivery(event);
- if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
- if (++app->acknowledged == app->message_count) {
- printf("%d messages sent and acknowledged\n", app->acknowledged);
- pn_connection_close(pn_event_connection(event));
- /* Continue handling events till we receive TRANSPORT_CLOSED */
- }
- } else {
- fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d));
- pn_connection_close(pn_event_connection(event));
- exit_code=1;
- }
- break;
- }
-
- case PN_TRANSPORT_CLOSED:
- check_condition(event, pn_transport_condition(pn_event_transport(event)));
- break;
-
- case PN_CONNECTION_REMOTE_CLOSE:
- check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_SESSION_REMOTE_CLOSE:
- check_condition(event, pn_session_remote_condition(pn_event_session(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_LINK_REMOTE_CLOSE:
- case PN_LINK_REMOTE_DETACH:
- check_condition(event, pn_link_remote_condition(pn_event_link(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_PROACTOR_INACTIVE:
- return false;
-
- default: break;
- }
- return true;
-}
-
-void run(app_data_t *app) {
- /* Loop and handle events */
- do {
- pn_event_batch_t *events = pn_proactor_wait(app->proactor);
- for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
- if (!handle(app, e)) {
- return;
- }
- }
- pn_proactor_done(app->proactor, events);
- } while(true);
-}
-
-int main(int argc, char **argv) {
- struct app_data_t app = {0};
- int i = 0;
- app.container_id = argv[i++]; /* Should be unique */
- app.host = (argc > 1) ? argv[i++] : "";
- app.port = (argc > 1) ? argv[i++] : "amqp";
- app.amqp_address = (argc > i) ? argv[i++] : "examples";
- app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
-
- app.proactor = pn_proactor();
- char addr[PN_MAX_ADDR];
- pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
- pn_proactor_connect(app.proactor, pn_connection(), addr);
- run(&app);
- pn_proactor_free(app.proactor);
- free(app.message_buffer.start);
- return exit_code;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/thread.h
----------------------------------------------------------------------
diff --git a/examples/c/proactor/thread.h b/examples/c/proactor/thread.h
deleted file mode 100644
index 3b9f19e..0000000
--- a/examples/c/proactor/thread.h
+++ /dev/null
@@ -1,49 +0,0 @@
-#ifndef _PROTON_EXAMPLES_C_PROACTOR_THREADS_H
-#define _PROTON_EXAMPLES_C_PROACTOR_THREADS_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */
-
-#ifdef _WIN32
-
-#include <windows.h>
-#include <time.h>
-#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */
-#include <process.h>
-#include <windows.h>
-
-#define pthread_function DWORD WINAPI
-#define pthread_function_return DWORD
-#define pthread_t HANDLE
-#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL)
-#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread))
-#define pthread_mutex_T HANDLE
-#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL))
-#define pthread_mutex_destroy(pobject) CloseHandle(*pobject)
-#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE)
-#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject)
-
-#else
-
-#include <pthread.h>
-
-#endif
-
-#endif
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/reactor/CMakeLists.txt b/examples/c/reactor/CMakeLists.txt
deleted file mode 100644
index bd6163f..0000000
--- a/examples/c/reactor/CMakeLists.txt
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED)
-
-set (reactor-examples
- sender.c
- receiver.c
- )
-
-set_source_files_properties (
- ${reactor-examples}
- PROPERTIES
- COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LINK_TIME_OPTIMIZATION}"
- )
-
-if (BUILD_WITH_CXX)
- set_source_files_properties (
- ${reactor-examples}
- PROPERTIES LANGUAGE CXX
- )
-endif (BUILD_WITH_CXX)
-
-include_directories(${Proton_INCLUDE_DIRS})
-add_executable(sender sender.c)
-add_executable(receiver receiver.c)
-target_link_libraries(sender ${Proton_LIBRARIES})
-target_link_libraries(receiver ${Proton_LIBRARIES})
-
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/README
----------------------------------------------------------------------
diff --git a/examples/c/reactor/README b/examples/c/reactor/README
deleted file mode 100644
index 8d61893..0000000
--- a/examples/c/reactor/README
+++ /dev/null
@@ -1,30 +0,0 @@
-These example clients require a broker or similar intermediary that
-supports the AMQP 1.0 protocol, allows anonymous connections and
-accepts links to and from a node named 'examples'.
-
-------------------------------------------------------------------
-
-sender.c
-
-A simple message sending client. This example sends all messages but
-the last as pre-settled (no ack required). It then pends waiting for
-an ack for the last message sent before exiting.
-
-Use the '-h' command line option for a list of supported parameters.
-
-------------------------------------------------------------------
-
-receiver.c
-
-A simple message consuming client. This example receives messages
-from a target (default 'examples'). Received messages are
-acknowledged if they are sent un-settled. The client will try to
-decode the message payload assuming it has been generated by the
-sender example.
-
-Use the '-h' command line option for a list of supported parameters.
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/receiver.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c
deleted file mode 100644
index e72a6d9..0000000
--- a/examples/c/reactor/receiver.c
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "pncompat/misc_funcs.inc"
-
-#include "proton/reactor.h"
-#include "proton/message.h"
-#include "proton/connection.h"
-#include "proton/session.h"
-#include "proton/link.h"
-#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
-#include "proton/transport.h"
-#include "proton/url.h"
-
-static int quiet = 0;
-
-// Credit batch if unlimited receive (-c 0)
-static const int CAPACITY = 100;
-#define MAX_SIZE 512
-
-// Example application data. This data will be instantiated in the event
-// handler, and is available during event processing. In this example it
-// holds configuration and state information.
-//
-typedef struct {
- int count; // # of messages to receive before exiting
- const char *source; // name of the source node to receive from
- pn_message_t *message; // holds the received message
-} app_data_t;
-
-// helper to pull pointer to app_data_t instance out of the pn_handler_t
-//
-#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
-
-// Called when reactor exits to clean up app_data
-//
-static void delete_handler(pn_handler_t *handler)
-{
- app_data_t *d = GET_APP_DATA(handler);
- if (d->message) {
- pn_decref(d->message);
- d->message = NULL;
- }
-}
-
-
-/* Process each event posted by the reactor.
- */
-static void event_handler(pn_handler_t *handler,
- pn_event_t *event,
- pn_event_type_t type)
-{
- app_data_t *data = GET_APP_DATA(handler);
-
- switch (type) {
-
- case PN_CONNECTION_INIT: {
- // Create and open all the endpoints needed to send a message
- //
- pn_connection_t *conn;
- pn_session_t *ssn;
- pn_link_t *receiver;
-
- conn = pn_event_connection(event);
- pn_connection_open(conn);
- ssn = pn_session(conn);
- pn_session_open(ssn);
- receiver = pn_receiver(ssn, "MyReceiver");
- pn_terminus_set_address(pn_link_source(receiver), data->source);
- pn_link_open(receiver);
- // cannot receive without granting credit:
- pn_link_flow(receiver, data->count ? data->count : CAPACITY);
- } break;
-
- case PN_DELIVERY: {
- // A message has been received
- //
- pn_link_t *link = NULL;
- pn_delivery_t *dlv = pn_event_delivery(event);
- if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
- // A full message has arrived
- if (!quiet) {
- ssize_t len;
- pn_bytes_t bytes;
- bool found = false;
-
- // try to decode the message body
- if (pn_delivery_pending(dlv) < MAX_SIZE) {
- static char buffer[MAX_SIZE];
- // read in the raw data
- len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
- if (len > 0) {
- // decode it into a proton message
- pn_message_clear(data->message);
- if (PN_OK == pn_message_decode(data->message, buffer,
- len)) {
- // Assuming the message came from the sender
- // example, try to parse out a single string from
- // the payload
- //
- pn_data_scan(pn_message_body(data->message), "?S",
- &found, &bytes);
- }
- }
- }
- if (found) {
- fprintf(stdout, "Message: [%.*s]\n", (int)bytes.size,
- bytes.start);
- } else {
- fprintf(stdout, "Message received!\n");
- }
- }
-
- link = pn_delivery_link(dlv);
-
- if (!pn_delivery_settled(dlv)) {
- // remote has not settled, so it is tracking the delivery. Ack
- // it.
- pn_delivery_update(dlv, PN_ACCEPTED);
- }
-
- // done with the delivery, move to the next and free it
- pn_link_advance(link);
- pn_delivery_settle(dlv); // dlv is now freed
-
- if (data->count == 0) {
- // send forever - see if more credit is needed
- if (pn_link_credit(link) < CAPACITY/2) {
- // Grant enough credit to bring it up to CAPACITY:
- pn_link_flow(link, CAPACITY - pn_link_credit(link));
- }
- } else if (--data->count == 0) {
- // done receiving, close the endpoints
- pn_session_t *ssn = pn_link_session(link);
- pn_link_close(link);
- pn_session_close(ssn);
- pn_connection_close(pn_session_connection(ssn));
- }
- }
- } break;
-
- case PN_TRANSPORT_ERROR: {
- // The connection to the peer failed.
- //
- pn_transport_t *tport = pn_event_transport(event);
- pn_condition_t *cond = pn_transport_condition(tport);
- fprintf(stderr, "Network transport failed!\n");
- if (pn_condition_is_set(cond)) {
- const char *name = pn_condition_get_name(cond);
- const char *desc = pn_condition_get_description(cond);
- fprintf(stderr, " Error: %s Description: %s\n",
- (name) ? name : "<error name not provided>",
- (desc) ? desc : "<no description provided>");
- }
- // pn_reactor_process() will exit with a false return value, stopping
- // the main loop.
- } break;
-
- default:
- break;
- }
-}
-
-static void usage(void)
-{
- printf("Usage: receiver <options>\n");
- printf("-a \tThe host address [localhost:5672]\n");
- printf("-c \t# of messages to receive, 0=receive forever [1]\n");
- printf("-s \tSource address [examples]\n");
- printf("-i \tContainer name [ReceiveExample]\n");
- printf("-q \tQuiet - turn off stdout\n");
- exit(1);
-}
-
-int main(int argc, char** argv)
-{
- const char *address = "localhost";
- const char *container = "ReceiveExample";
- int c;
- pn_reactor_t *reactor = NULL;
- pn_url_t *url = NULL;
- pn_connection_t *conn = NULL;
-
- /* create a handler for the connection's events.
- * event_handler will be called for each event. The handler will allocate
- * a app_data_t instance which can be accessed when the event_handler is
- * called.
- */
- pn_handler_t *handler = pn_handler_new(event_handler,
- sizeof(app_data_t),
- delete_handler);
-
- /* set up the application data with defaults */
- app_data_t *app_data = GET_APP_DATA(handler);
- memset(app_data, 0, sizeof(app_data_t));
- app_data->count = 1;
- app_data->source = "examples";
- app_data->message = pn_message();
-
- /* Attach the pn_handshaker() handler. This handler deals with endpoint
- * events from the peer so we don't have to.
- */
- {
- pn_handler_t *handshaker = pn_handshaker();
- pn_handler_add(handler, handshaker);
- pn_decref(handshaker);
- }
-
- /* command line options */
- opterr = 0;
- while((c = getopt(argc, argv, "i:a:c:s:qh")) != -1) {
- switch(c) {
- case 'h': usage(); break;
- case 'a': address = optarg; break;
- case 'c':
- app_data->count = atoi(optarg);
- if (app_data->count < 0) usage();
- break;
- case 's': app_data->source = optarg; break;
- case 'i': container = optarg; break;
- case 'q': quiet = 1; break;
- default:
- usage();
- break;
- }
- }
-
- reactor = pn_reactor();
-
- url = pn_url_parse(address);
- if (url == NULL) {
- fprintf(stderr, "Invalid host address %s\n", address);
- exit(1);
- }
- conn = pn_reactor_connection_to_host(reactor,
- pn_url_get_host(url),
- pn_url_get_port(url),
- handler);
- pn_decref(url);
- pn_decref(handler);
-
- // the container name should be unique for each client
- pn_connection_set_container(conn, container);
-
- // wait up to 5 seconds for activity before returning from
- // pn_reactor_process()
- pn_reactor_set_timeout(reactor, 5000);
-
- pn_reactor_start(reactor);
-
- while (pn_reactor_process(reactor)) {
- /* Returns 'true' until the connection is shut down.
- * pn_reactor_process() will return true at least once every 5 seconds
- * (due to the timeout). If no timeout was configured,
- * pn_reactor_process() returns as soon as it finishes processing all
- * pending I/O and events. Once the connection has closed,
- * pn_reactor_process() will return false.
- */
- }
- pn_decref(reactor);
-
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/sender.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/sender.c b/examples/c/reactor/sender.c
deleted file mode 100644
index 6c3cdb3..0000000
--- a/examples/c/reactor/sender.c
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "pncompat/misc_funcs.inc"
-
-#include "proton/reactor.h"
-#include "proton/message.h"
-#include "proton/connection.h"
-#include "proton/session.h"
-#include "proton/link.h"
-#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
-#include "proton/transport.h"
-#include "proton/url.h"
-
-
-static int quiet = 0;
-
-// Example application data. This data will be instantiated in the event
-// handler, and is available during event processing. In this example it
-// holds configuration and state information.
-//
-typedef struct {
- int count; // # messages to send
- int anon; // use anonymous link if true
- const char *target; // name of destination target
- char *msg_data; // pre-encoded outbound message
- size_t msg_len; // bytes in msg_data
-} app_data_t;
-
-// helper to pull pointer to app_data_t instance out of the pn_handler_t
-//
-#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
-
-// Called when reactor exits to clean up app_data
-//
-static void delete_handler(pn_handler_t *handler)
-{
- app_data_t *d = GET_APP_DATA(handler);
- if (d->msg_data) {
- free(d->msg_data);
- d->msg_data = NULL;
- }
-}
-
-/* Process each event posted by the reactor.
- */
-static void event_handler(pn_handler_t *handler,
- pn_event_t *event,
- pn_event_type_t type)
-{
- app_data_t *data = GET_APP_DATA(handler);
-
- switch (type) {
-
- case PN_CONNECTION_INIT: {
- // Create and open all the endpoints needed to send a message
- //
- pn_connection_t *conn;
- pn_session_t *ssn;
- pn_link_t *sender;
-
- conn = pn_event_connection(event);
- pn_connection_open(conn);
- ssn = pn_session(conn);
- pn_session_open(ssn);
- sender = pn_sender(ssn, "MySender");
- // we do not wait for ack until the last message
- pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
- if (!data->anon) {
- pn_terminus_set_address(pn_link_target(sender), data->target);
- }
- pn_link_open(sender);
- } break;
-
- case PN_LINK_FLOW: {
- // the remote has given us some credit, now we can send messages
- //
- static long tag = 0; // a simple tag generator
- pn_delivery_t *delivery;
- pn_link_t *sender = pn_event_link(event);
- int credit = pn_link_credit(sender);
- while (credit > 0 && data->count > 0) {
- --credit;
- --data->count;
- ++tag;
- delivery = pn_delivery(sender,
- pn_dtag((const char *)&tag, sizeof(tag)));
- pn_link_send(sender, data->msg_data, data->msg_len);
- pn_link_advance(sender);
- if (data->count > 0) {
- // send pre-settled until the last one, then wait for an ack on
- // the last sent message. This allows the sender to send
- // messages as fast as possible and then exit when the consumer
- // has dealt with the last one.
- //
- pn_delivery_settle(delivery);
- }
- }
- } break;
-
- case PN_DELIVERY: {
- // Since the example sends all messages but the last pre-settled
- // (pre-acked), only the last message's delivery will get updated with
- // the remote state (acked/nacked).
- //
- pn_delivery_t *dlv = pn_event_delivery(event);
- if (pn_delivery_updated(dlv) && pn_delivery_remote_state(dlv)) {
- uint64_t rs = pn_delivery_remote_state(dlv);
- int done = 1;
- switch (rs) {
- case PN_RECEIVED:
- // This is not a terminal state - it is informational, and the
- // peer is still processing the message.
- done = 0;
- break;
- case PN_ACCEPTED:
- pn_delivery_settle(dlv);
- if (!quiet) fprintf(stdout, "Send complete!\n");
- break;
- case PN_REJECTED:
- case PN_RELEASED:
- case PN_MODIFIED:
- pn_delivery_settle(dlv);
- fprintf(stderr, "Message not accepted - code:%lu\n", (unsigned long)rs);
- break;
- default:
- // ??? no other terminal states defined, so ignore anything else
- pn_delivery_settle(dlv);
- fprintf(stderr, "Unknown delivery failure - code=%lu\n", (unsigned long)rs);
- break;
- }
-
- if (done) {
- // initiate clean shutdown of the endpoints
- pn_link_t *link = pn_delivery_link(dlv);
- pn_session_t *ssn = pn_link_session(link);
- pn_link_close(link);
- pn_session_close(ssn);
- pn_connection_close(pn_session_connection(ssn));
- }
- }
- } break;
-
- case PN_TRANSPORT_ERROR: {
- // The connection to the peer failed.
- //
- pn_transport_t *tport = pn_event_transport(event);
- pn_condition_t *cond = pn_transport_condition(tport);
- fprintf(stderr, "Network transport failed!\n");
- if (pn_condition_is_set(cond)) {
- const char *name = pn_condition_get_name(cond);
- const char *desc = pn_condition_get_description(cond);
- fprintf(stderr, " Error: %s Description: %s\n",
- (name) ? name : "<error name not provided>",
- (desc) ? desc : "<no description provided>");
- }
- // pn_reactor_process() will exit with a false return value, stopping
- // the main loop.
- } break;
-
- default:
- break;
- }
-}
-
-static void usage(void)
-{
- printf("Usage: send <options> <message>\n");
- printf("-a \tThe host address [localhost:5672]\n");
- printf("-c \t# of messages to send [1]\n");
- printf("-t \tTarget address [examples]\n");
- printf("-n \tUse an anonymous link [off]\n");
- printf("-i \tContainer name [SendExample]\n");
- printf("-q \tQuiet - turn off stdout\n");
- printf("message \tA text string to send.\n");
- exit(1);
-}
-
-int main(int argc, char** argv)
-{
- const char *address = "localhost";
- const char *msgtext = "Hello World!";
- const char *container = "SendExample";
- int c;
- pn_message_t *message = NULL;
- pn_data_t *body = NULL;
- pn_reactor_t *reactor = NULL;
- pn_url_t *url = NULL;
- pn_connection_t *conn = NULL;
-
- /* Create a handler for the connection's events. event_handler() will be
- * called for each event and delete_handler will be called when the
- * connection is released. The handler will allocate an app_data_t
- * instance which can be accessed when the event_handler is called.
- */
- pn_handler_t *handler = pn_handler_new(event_handler,
- sizeof(app_data_t),
- delete_handler);
-
- /* set up the application data with defaults */
- app_data_t *app_data = GET_APP_DATA(handler);
- memset(app_data, 0, sizeof(app_data_t));
- app_data->count = 1;
- app_data->target = "examples";
-
- /* Attach the pn_handshaker() handler. This handler deals with endpoint
- * events from the peer so we don't have to.
- */
- {
- pn_handler_t *handshaker = pn_handshaker();
- pn_handler_add(handler, handshaker);
- pn_decref(handshaker);
- }
-
- /* command line options */
- opterr = 0;
- while((c = getopt(argc, argv, "i:a:c:t:nhq")) != -1) {
- switch(c) {
- case 'h': usage(); break;
- case 'a': address = optarg; break;
- case 'c':
- app_data->count = atoi(optarg);
- if (app_data->count < 1) usage();
- break;
- case 't': app_data->target = optarg; break;
- case 'n': app_data->anon = 1; break;
- case 'i': container = optarg; break;
- case 'q': quiet = 1; break;
- default:
- usage();
- break;
- }
- }
- if (optind < argc) msgtext = argv[optind];
-
-
- // create a single message and pre-encode it so we only have to do that
- // once. All transmits will use the same pre-encoded message simply for
- // speed.
- //
- message = pn_message();
- pn_message_set_address(message, app_data->target);
- body = pn_message_body(message);
- pn_data_clear(body);
-
- // This message's body contains a single string
- if (pn_data_fill(body, "S", msgtext)) {
- fprintf(stderr, "Error building message!\n");
- exit(1);
- }
- pn_data_rewind(body);
- {
- // encode the message, expanding the encode buffer as needed
- //
- size_t len = 128;
- char *buf = (char *)malloc(len);
- int rc = 0;
- do {
- rc = pn_message_encode(message, buf, &len);
- if (rc == PN_OVERFLOW) {
- free(buf);
- len *= 2;
- buf = (char *)malloc(len);
- }
- } while (rc == PN_OVERFLOW);
- app_data->msg_len = len;
- app_data->msg_data = buf;
- }
- pn_decref(message); // message no longer needed
-
- reactor = pn_reactor();
-
- url = pn_url_parse(address);
- if (url == NULL) {
- fprintf(stderr, "Invalid host address %s\n", address);
- exit(1);
- }
- conn = pn_reactor_connection_to_host(reactor,
- pn_url_get_host(url),
- pn_url_get_port(url),
- handler);
- pn_decref(url);
- pn_decref(handler);
-
- // the container name should be unique for each client
- pn_connection_set_container(conn, container);
-
- // wait up to 5 seconds for activity before returning from
- // pn_reactor_process()
- pn_reactor_set_timeout(reactor, 5000);
-
- pn_reactor_start(reactor);
-
- while (pn_reactor_process(reactor)) {
- /* Returns 'true' until the connection is shut down.
- * pn_reactor_process() will return true at least once every 5 seconds
- * (due to the timeout). If no timeout was configured,
- * pn_reactor_process() returns as soon as it finishes processing all
- * pending I/O and events. Once the connection has closed,
- * pn_reactor_process() will return false.
- */
- }
- pn_decref(reactor);
-
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/receive.c b/examples/c/receive.c
new file mode 100644
index 0000000..6fd74a5
--- /dev/null
+++ b/examples/c/receive.c
@@ -0,0 +1,188 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/condition.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+typedef struct app_data_t {
+ const char *host, *port;
+ const char *amqp_address;
+ const char *container_id;
+ int message_count;
+
+ pn_proactor_t *proactor;
+ int received;
+ bool finished;
+} app_data_t;
+
+static const int BATCH = 1000; /* Batch size for unlimited receive */
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+ if (pn_condition_is_set(cond)) {
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
+ pn_connection_close(pn_event_connection(e));
+ exit_code = 1;
+ }
+}
+
+#define MAX_SIZE 1024
+
+static void decode_message(pn_delivery_t *dlv) {
+ static char buffer[MAX_SIZE];
+ ssize_t len;
+ // try to decode the message body
+ if (pn_delivery_pending(dlv) < MAX_SIZE) {
+ // read in the raw data
+ len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
+ if (len > 0) {
+ // decode it into a proton message
+ pn_message_t *m = pn_message();
+ if (PN_OK == pn_message_decode(m, buffer, len)) {
+ pn_string_t *s = pn_string(NULL);
+ pn_inspect(pn_message_body(m), s);
+ printf("%s\n", pn_string_get(s));
+ pn_free(s);
+ }
+ pn_message_free(m);
+ }
+ }
+}
+
+/* Return true to continue, false to exit */
+static bool handle(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_CONNECTION_INIT: {
+ pn_connection_t* c = pn_event_connection(event);
+ pn_connection_set_container(c, app->container_id);
+ pn_connection_open(c);
+ pn_session_t* s = pn_session(c);
+ pn_session_open(s);
+ pn_link_t* l = pn_receiver(s, "my_receiver");
+ pn_terminus_set_address(pn_link_source(l), app->amqp_address);
+ pn_link_open(l);
+ /* cannot receive without granting credit: */
+ pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+ } break;
+
+ case PN_DELIVERY: {
+ /* A message has been received */
+ pn_link_t *link = NULL;
+ pn_delivery_t *dlv = pn_event_delivery(event);
+ if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+ link = pn_delivery_link(dlv);
+ decode_message(dlv);
+ /* Accept the delivery */
+ pn_delivery_update(dlv, PN_ACCEPTED);
+ /* done with the delivery, move to the next and free it */
+ pn_link_advance(link);
+ pn_delivery_settle(dlv); /* dlv is now freed */
+
+ if (app->message_count == 0) {
+ /* receive forever - see if more credit is needed */
+ if (pn_link_credit(link) < BATCH/2) {
+ /* Grant enough credit to bring it up to BATCH: */
+ pn_link_flow(link, BATCH - pn_link_credit(link));
+ }
+ } else if (++app->received >= app->message_count) {
+ /* done receiving, close the endpoints */
+ printf("%d messages received\n", app->received);
+ pn_session_t *ssn = pn_link_session(link);
+ pn_link_close(link);
+ pn_session_close(ssn);
+ pn_connection_close(pn_session_connection(ssn));
+ }
+ }
+ } break;
+
+ case PN_TRANSPORT_CLOSED:
+ check_condition(event, pn_transport_condition(pn_event_transport(event)));
+ break;
+
+ case PN_CONNECTION_REMOTE_CLOSE:
+ check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_SESSION_REMOTE_CLOSE:
+ check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_LINK_REMOTE_CLOSE:
+ case PN_LINK_REMOTE_DETACH:
+ check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_PROACTOR_INACTIVE:
+ return false;
+ break;
+
+ default:
+ break;
+ }
+ return true;
+}
+
+void run(app_data_t *app) {
+ /* Loop and handle events */
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+ for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+ if (!handle(app, e)) {
+ return;
+ }
+ }
+ pn_proactor_done(app->proactor, events);
+ } while(true);
+}
+
+int main(int argc, char **argv) {
+ struct app_data_t app = {0};
+ int i = 0;
+ app.container_id = argv[i++]; /* Should be unique */
+ app.host = (argc > 1) ? argv[i++] : "";
+ app.port = (argc > 1) ? argv[i++] : "amqp";
+ app.amqp_address = (argc > i) ? argv[i++] : "examples";
+ app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+
+ /* Create the proactor and connect */
+ app.proactor = pn_proactor();
+ char addr[PN_MAX_ADDR];
+ pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+ pn_proactor_connect(app.proactor, pn_connection(), addr);
+ run(&app);
+ pn_proactor_free(app.proactor);
+ return exit_code;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/send.c
----------------------------------------------------------------------
diff --git a/examples/c/send.c b/examples/c/send.c
new file mode 100644
index 0000000..43da8b0
--- /dev/null
+++ b/examples/c/send.c
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/condition.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+typedef struct app_data_t {
+ const char *host, *port;
+ const char *amqp_address;
+ const char *container_id;
+ int message_count;
+
+ pn_proactor_t *proactor;
+ pn_rwbytes_t message_buffer;
+ int sent;
+ int acknowledged;
+} app_data_t;
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+ if (pn_condition_is_set(cond)) {
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
+ pn_connection_close(pn_event_connection(e));
+ exit_code = 1;
+ }
+}
+
+/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
+static pn_bytes_t encode_message(app_data_t* app) {
+ /* Construct a message with the map { "sequence": app.sent } */
+ pn_message_t* message = pn_message();
+ pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+ pn_data_t* body = pn_message_body(message);
+ pn_data_put_map(body);
+ pn_data_enter(body);
+ pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+ pn_data_put_int(body, app->sent); /* The sequence number */
+ pn_data_exit(body);
+
+ /* encode the message, expanding the encode buffer as needed */
+ if (app->message_buffer.start == NULL) {
+ static const size_t initial_size = 128;
+ app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+ }
+ /* app->message_buffer is the total buffer space available. */
+ /* mbuf wil point at just the portion used by the encoded message */
+ pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+ int status = 0;
+ while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+ app->message_buffer.size *= 2;
+ app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+ mbuf.size = app->message_buffer.size;
+ }
+ if (status != 0) {
+ fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+ exit(1);
+ }
+ pn_message_free(message);
+ return pn_bytes(mbuf.size, mbuf.start);
+}
+
+/* Returns true to continue, false if finished */
+static bool handle(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_CONNECTION_INIT: {
+ pn_connection_t* c = pn_event_connection(event);
+ pn_connection_set_container(c, app->container_id);
+ pn_connection_open(c);
+ pn_session_t* s = pn_session(pn_event_connection(event));
+ pn_session_open(s);
+ pn_link_t* l = pn_sender(s, "my_sender");
+ pn_terminus_set_address(pn_link_target(l), app->amqp_address);
+ pn_link_open(l);
+ break;
+ }
+
+ case PN_LINK_FLOW: {
+ /* The peer has given us some credit, now we can send messages */
+ pn_link_t *sender = pn_event_link(event);
+ while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+ ++app->sent;
+ // Use sent counter as unique delivery tag.
+ pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+ pn_bytes_t msgbuf = encode_message(app);
+ pn_link_send(sender, msgbuf.start, msgbuf.size);
+ pn_link_advance(sender);
+ }
+ break;
+ }
+
+ case PN_DELIVERY: {
+ /* We received acknowledgedment from the peer that a message was delivered. */
+ pn_delivery_t* d = pn_event_delivery(event);
+ if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+ if (++app->acknowledged == app->message_count) {
+ printf("%d messages sent and acknowledged\n", app->acknowledged);
+ pn_connection_close(pn_event_connection(event));
+ /* Continue handling events till we receive TRANSPORT_CLOSED */
+ }
+ } else {
+ fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d));
+ pn_connection_close(pn_event_connection(event));
+ exit_code=1;
+ }
+ break;
+ }
+
+ case PN_TRANSPORT_CLOSED:
+ check_condition(event, pn_transport_condition(pn_event_transport(event)));
+ break;
+
+ case PN_CONNECTION_REMOTE_CLOSE:
+ check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_SESSION_REMOTE_CLOSE:
+ check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_LINK_REMOTE_CLOSE:
+ case PN_LINK_REMOTE_DETACH:
+ check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_PROACTOR_INACTIVE:
+ return false;
+
+ default: break;
+ }
+ return true;
+}
+
+void run(app_data_t *app) {
+ /* Loop and handle events */
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+ for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+ if (!handle(app, e)) {
+ return;
+ }
+ }
+ pn_proactor_done(app->proactor, events);
+ } while(true);
+}
+
+int main(int argc, char **argv) {
+ struct app_data_t app = {0};
+ int i = 0;
+ app.container_id = argv[i++]; /* Should be unique */
+ app.host = (argc > 1) ? argv[i++] : "";
+ app.port = (argc > 1) ? argv[i++] : "amqp";
+ app.amqp_address = (argc > i) ? argv[i++] : "examples";
+ app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+
+ app.proactor = pn_proactor();
+ char addr[PN_MAX_ADDR];
+ pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+ pn_proactor_connect(app.proactor, pn_connection(), addr);
+ run(&app);
+ pn_proactor_free(app.proactor);
+ free(app.message_buffer.start);
+ return exit_code;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/thread.h
----------------------------------------------------------------------
diff --git a/examples/c/thread.h b/examples/c/thread.h
new file mode 100644
index 0000000..1bd5595
--- /dev/null
+++ b/examples/c/thread.h
@@ -0,0 +1,49 @@
+#ifndef _PROTON_EXAMPLES_C_THREADS_H
+#define _PROTON_EXAMPLES_C_THREADS_H 1
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */
+
+#ifdef _WIN32
+
+#include <windows.h>
+#include <time.h>
+#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */
+#include <process.h>
+#include <windows.h>
+
+#define pthread_function DWORD WINAPI
+#define pthread_function_return DWORD
+#define pthread_t HANDLE
+#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL)
+#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread))
+#define pthread_mutex_T HANDLE
+#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL))
+#define pthread_mutex_destroy(pobject) CloseHandle(*pobject)
+#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE)
+#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject)
+
+#else
+
+#include <pthread.h>
+
+#endif
+
+#endif /* thread.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index 50ea677..c562025 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -63,9 +63,9 @@ if(HAS_PROACTOR)
endif(WIN32)
if(WIN32)
- # set(path "$<TARGET_FILE_DIR:proactor-broker>;$<TARGET_FILE_DIR:qpid-proton>")
+ # set(path "$<TARGET_FILE_DIR:c-broker>;$<TARGET_FILE_DIR:qpid-proton>")
else(WIN32)
- set(path "${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/examples/c/proactor:$ENV{PATH}")
+ set(path "${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/examples/c:$ENV{PATH}")
endif(WIN32)
# Add the tools directory for the 'proctest' module
set_search_path(pypath "${CMAKE_SOURCE_DIR}/tools/py" "$ENV{PYTHON_PATH}")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: PROTON-1585: Remove old reactor and
messenger examples; promote the proactor examples to the top level
Posted by jr...@apache.org.
PROTON-1585: Remove old reactor and messenger examples; promote the proactor examples to the top level
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/564e0ca4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/564e0ca4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/564e0ca4
Branch: refs/heads/master
Commit: 564e0ca4c3367f66824e5ecf417afe70c41fb2d9
Parents: 6888ab5
Author: Justin Ross <jr...@apache.org>
Authored: Mon Sep 11 15:31:15 2017 -0700
Committer: Justin Ross <jr...@apache.org>
Committed: Mon Sep 11 15:44:30 2017 -0700
----------------------------------------------------------------------
examples/c/CMakeLists.txt | 35 ++-
examples/c/README.dox | 21 ++
examples/c/broker.c | 424 +++++++++++++++++++++++++++++++
examples/c/direct.c | 326 ++++++++++++++++++++++++
examples/c/example_test.py | 88 +++++++
examples/c/messenger/CMakeLists.txt | 52 ----
examples/c/messenger/recv-async.c | 193 --------------
examples/c/messenger/recv.c | 154 -----------
examples/c/messenger/send-async.c | 170 -------------
examples/c/messenger/send.c | 111 --------
examples/c/proactor/CMakeLists.txt | 44 ----
examples/c/proactor/README.dox | 21 --
examples/c/proactor/broker.c | 424 -------------------------------
examples/c/proactor/direct.c | 326 ------------------------
examples/c/proactor/example_test.py | 88 -------
examples/c/proactor/receive.c | 188 --------------
examples/c/proactor/send.c | 196 --------------
examples/c/proactor/thread.h | 49 ----
examples/c/reactor/CMakeLists.txt | 45 ----
examples/c/reactor/README | 30 ---
examples/c/reactor/receiver.c | 286 ---------------------
examples/c/reactor/sender.c | 329 ------------------------
examples/c/receive.c | 188 ++++++++++++++
examples/c/send.c | 196 ++++++++++++++
examples/c/thread.h | 49 ++++
proton-c/src/tests/CMakeLists.txt | 4 +-
26 files changed, 1320 insertions(+), 2717 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt
index 12867be..c300b00 100644
--- a/examples/c/CMakeLists.txt
+++ b/examples/c/CMakeLists.txt
@@ -17,16 +17,33 @@
# under the License.
#
-find_package(Proton REQUIRED)
+find_package(Proton REQUIRED Core Proactor)
+set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
+find_package(Threads REQUIRED)
+
include(CheckCCompilerFlag)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
+include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
+
+add_definitions(${COMPILE_LANGUAGE_FLAGS} ${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
+
+# Add a test with the correct environment to find test executables and valgrind.
+if(WIN32)
+ set(test_path "$<TARGET_FILE_DIR:broker>;$<TARGET_FILE_DIR:qpid-proton>")
+else()
+ set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
+endif()
-if(Proton_Proactor_FOUND)
- if(WIN32)
- message(STATUS "Windows IOCP proactor examples temporarily disabled for build")
- else(WIN32)
- add_subdirectory(proactor)
- endif(WIN32)
+if(WIN32)
+ message(STATUS "Windows IOCP proactor examples temporarily disabled for build")
+else()
+ foreach (name broker send receive direct)
+ add_executable(c-${name} ${name}.c)
+ target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
+ set_target_properties(c-${name} PROPERTIES OUTPUT_NAME ${name})
+ endforeach()
endif()
-add_subdirectory(messenger)
-add_subdirectory(reactor)
+
+set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
+
+add_test(c-example-tests ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/README.dox b/examples/c/README.dox
new file mode 100644
index 0000000..a548d35
--- /dev/null
+++ b/examples/c/README.dox
@@ -0,0 +1,21 @@
+/**
+ * @example send.c
+ *
+ * Send a fixed number of messages to the "examples" node.
+ * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
+ *
+ * @example receive.c
+ *
+ * Subscribes to the 'example' node and prints the message bodies received.
+ * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
+ *
+ * @example direct.c
+ *
+ * A server that can be used to demonstrate direct (no broker) peer-to-peer communication
+ * It can accept an incoming connection from either the @ref send.c or @ref receive.c examples
+ * and will act as the directly-connected counterpart (receive or send)
+ *
+ * @example broker.c
+ *
+ * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples.
+ */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
new file mode 100644
index 0000000..e0d9672
--- /dev/null
+++ b/examples/c/broker.c
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "thread.h"
+
+#include <proton/engine.h>
+#include <proton/listener.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+/* Simple re-sizable vector that acts as a queue */
+#define VEC(T) struct { T* data; size_t len, cap; }
+
+#define VEC_INIT(V) \
+ do { \
+ V.len = 0; \
+ V.cap = 16; \
+ void **vp = (void**)&V.data; \
+ *vp = malloc(V.cap * sizeof(*V.data)); \
+ } while(0)
+
+#define VEC_FINAL(V) free(V.data)
+
+#define VEC_PUSH(V, X) \
+ do { \
+ if (V.len == V.cap) { \
+ V.cap *= 2; \
+ void **vp = (void**)&V.data; \
+ *vp = realloc(V.data, V.cap * sizeof(*V.data)); \
+ } \
+ V.data[V.len++] = X; \
+ } while(0) \
+
+#define VEC_POP(V) \
+ do { \
+ if (V.len > 0) \
+ memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data)); \
+ } while(0)
+
+/* Simple thread-safe queue implementation */
+typedef struct queue_t {
+ pthread_mutex_t lock;
+ char name[256];
+ VEC(pn_rwbytes_t) messages; /* Messages on the queue_t */
+ VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
+ struct queue_t *next; /* Next queue in chain */
+ size_t sent; /* Count of messages sent, used as delivery tag */
+} queue_t;
+
+static void queue_init(queue_t *q, const char* name, queue_t *next) {
+ pthread_mutex_init(&q->lock, NULL);
+ strncpy(q->name, name, sizeof(q->name));
+ VEC_INIT(q->messages);
+ VEC_INIT(q->waiting);
+ q->next = next;
+ q->sent = 0;
+}
+
+static void queue_destroy(queue_t *q) {
+ pthread_mutex_destroy(&q->lock);
+ free(q->name);
+ for (size_t i = 0; i < q->messages.len; ++i)
+ free(q->messages.data[i].start);
+ VEC_FINAL(q->messages);
+ for (size_t i = 0; i < q->waiting.len; ++i)
+ pn_decref(q->waiting.data[i]);
+ VEC_FINAL(q->waiting);
+}
+
+/* Send a message on s, or record s as eating if no messages.
+ Called in s dispatch loop, assumes s has credit.
+*/
+static void queue_send(queue_t *q, pn_link_t *s) {
+ pn_rwbytes_t m = { 0 };
+ size_t tag = 0;
+ pthread_mutex_lock(&q->lock);
+ if (q->messages.len == 0) { /* Empty, record connection as waiting */
+ /* Record connection for wake-up if not already on the list. */
+ pn_connection_t *c = pn_session_connection(pn_link_session(s));
+ size_t i = 0;
+ for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
+ ;
+ if (i == q->waiting.len) {
+ VEC_PUSH(q->waiting, c);
+ }
+ } else {
+ m = q->messages.data[0];
+ VEC_POP(q->messages);
+ tag = ++q->sent;
+ }
+ pthread_mutex_unlock(&q->lock);
+ if (m.start) {
+ pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
+ pn_link_send(s, m.start, m.size);
+ pn_link_advance(s);
+ pn_delivery_settle(d); /* Pre-settled: unreliable, there will bea no ack/ */
+ free(m.start);
+ }
+}
+
+/* Data associated with each broker connection */
+typedef struct broker_data_t {
+ bool check_queues; /* Check senders on the connection for available data in queues. */
+} broker_data_t;
+
+/* Use the context pointer as a boolean flag to indicate we need to check queues */
+void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
+ pn_connection_set_context(c, (void*)check);
+}
+
+bool pn_connection_get_check_queues(pn_connection_t *c) {
+ return (bool)pn_connection_get_context(c);
+}
+
+/* Put a message on the queue, called in receiver dispatch loop.
+ If the queue was previously empty, notify waiting senders.
+*/
+static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
+ pthread_mutex_lock(&q->lock);
+ VEC_PUSH(q->messages, m);
+ if (q->messages.len == 1) { /* Was empty, notify waiting connections */
+ for (size_t i = 0; i < q->waiting.len; ++i) {
+ pn_connection_t *c = q->waiting.data[i];
+ pn_connection_set_check_queues(c, true);
+ pn_connection_wake(c); /* Wake the connection */
+ }
+ q->waiting.len = 0;
+ }
+ pthread_mutex_unlock(&q->lock);
+}
+
+/* Thread safe set of queues */
+typedef struct queues_t {
+ pthread_mutex_t lock;
+ queue_t *queues;
+ size_t sent;
+} queues_t;
+
+void queues_init(queues_t *qs) {
+ pthread_mutex_init(&qs->lock, NULL);
+ qs->queues = NULL;
+}
+
+void queues_destroy(queues_t *qs) {
+ for (queue_t *q = qs->queues; q; q = q->next) {
+ queue_destroy(q);
+ free(q);
+ }
+ pthread_mutex_destroy(&qs->lock);
+}
+
+/** Get or create the named queue. */
+queue_t* queues_get(queues_t *qs, const char* name) {
+ pthread_mutex_lock(&qs->lock);
+ queue_t *q;
+ for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
+ ;
+ if (!q) {
+ q = (queue_t*)malloc(sizeof(queue_t));
+ queue_init(q, name, qs->queues);
+ qs->queues = q;
+ }
+ pthread_mutex_unlock(&qs->lock);
+ return q;
+}
+
+/* The broker implementation */
+typedef struct broker_t {
+ pn_proactor_t *proactor;
+ size_t threads;
+ const char *container_id; /* AMQP container-id */
+ queues_t queues;
+ bool finished;
+} broker_t;
+
+void broker_stop(broker_t *b) {
+ /* Interrupt the proactor to stop the working threads. */
+ pn_proactor_interrupt(b->proactor);
+}
+
+/* Try to send if link is sender and has credit */
+static void link_send(broker_t *b, pn_link_t *s) {
+ if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
+ const char *qname = pn_terminus_get_address(pn_link_source(s));
+ queue_t *q = queues_get(&b->queues, qname);
+ queue_send(q, s);
+ }
+}
+
+static void queue_unsub(queue_t *q, pn_connection_t *c) {
+ pthread_mutex_lock(&q->lock);
+ for (size_t i = 0; i < q->waiting.len; ++i) {
+ if (q->waiting.data[i] == c){
+ q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
+ VEC_POP(q->waiting);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&q->lock);
+}
+
+/* Unsubscribe from the queue of interest to this link. */
+static void link_unsub(broker_t *b, pn_link_t *s) {
+ if (pn_link_is_sender(s)) {
+ const char *qname = pn_terminus_get_address(pn_link_source(s));
+ if (qname) {
+ queue_t *q = queues_get(&b->queues, qname);
+ queue_unsub(q, pn_session_connection(pn_link_session(s)));
+ }
+ }
+}
+
+/* Called in connection's event loop when a connection is woken for messages.*/
+static void connection_unsub(broker_t *b, pn_connection_t *c) {
+ for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
+ link_unsub(b, l);
+}
+
+static void session_unsub(broker_t *b, pn_session_t *ssn) {
+ pn_connection_t *c = pn_session_connection(ssn);
+ for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
+ if (pn_link_session(l) == ssn)
+ link_unsub(b, l);
+ }
+}
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+ if (pn_condition_is_set(cond)) {
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
+ exit_code = 1; /* Remeber there was an unexpected error */
+ }
+}
+
+const int WINDOW=10; /* Incoming credit window */
+
+static void handle(broker_t* b, pn_event_t* e) {
+ pn_connection_t *c = pn_event_connection(e);
+
+ switch (pn_event_type(e)) {
+
+ case PN_LISTENER_OPEN:
+ printf("listening\n");
+ fflush(stdout);
+ break;
+
+ case PN_LISTENER_ACCEPT:
+ pn_listener_accept(pn_event_listener(e), pn_connection());
+ break;
+
+ case PN_CONNECTION_INIT:
+ pn_connection_set_container(c, b->container_id);
+ break;
+
+ case PN_CONNECTION_BOUND: {
+ /* Turn off security */
+ pn_transport_t *t = pn_connection_transport(c);
+ pn_transport_require_auth(t, false);
+ pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
+ break;
+ }
+ case PN_CONNECTION_REMOTE_OPEN: {
+ pn_connection_open(pn_event_connection(e)); /* Complete the open */
+ break;
+ }
+ case PN_CONNECTION_WAKE: {
+ if (pn_connection_get_check_queues(c)) {
+ pn_connection_set_check_queues(c, false);
+ int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
+ for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
+ link_send(b, l);
+ }
+ break;
+ }
+ case PN_SESSION_REMOTE_OPEN: {
+ pn_session_open(pn_event_session(e));
+ break;
+ }
+ case PN_LINK_REMOTE_OPEN: {
+ pn_link_t *l = pn_event_link(e);
+ if (pn_link_is_sender(l)) {
+ const char *source = pn_terminus_get_address(pn_link_remote_source(l));
+ pn_terminus_set_address(pn_link_source(l), source);
+ } else {
+ const char* target = pn_terminus_get_address(pn_link_remote_target(l));
+ pn_terminus_set_address(pn_link_target(l), target);
+ pn_link_flow(l, WINDOW);
+ }
+ pn_link_open(l);
+ break;
+ }
+ case PN_LINK_FLOW: {
+ link_send(b, pn_event_link(e));
+ break;
+ }
+ case PN_DELIVERY: {
+ pn_delivery_t *d = pn_event_delivery(e);
+ pn_link_t *r = pn_delivery_link(d);
+ if (pn_link_is_receiver(r) &&
+ pn_delivery_readable(d) && !pn_delivery_partial(d))
+ {
+ size_t size = pn_delivery_pending(d);
+ /* The broker does not decode the message, just forwards it. */
+ pn_rwbytes_t m = { size, (char*)malloc(size) };
+ pn_link_recv(r, m.start, m.size);
+ const char *qname = pn_terminus_get_address(pn_link_target(r));
+ queue_receive(b->proactor, queues_get(&b->queues, qname), m);
+ pn_delivery_update(d, PN_ACCEPTED);
+ pn_delivery_settle(d);
+ pn_link_flow(r, WINDOW - pn_link_credit(r));
+ }
+ break;
+ }
+
+ case PN_TRANSPORT_CLOSED:
+ check_condition(e, pn_transport_condition(pn_event_transport(e)));
+ connection_unsub(b, pn_event_connection(e));
+ break;
+
+ case PN_CONNECTION_REMOTE_CLOSE:
+ check_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
+ pn_connection_close(pn_event_connection(e));
+ break;
+
+ case PN_SESSION_REMOTE_CLOSE:
+ check_condition(e, pn_session_remote_condition(pn_event_session(e)));
+ session_unsub(b, pn_event_session(e));
+ pn_session_close(pn_event_session(e));
+ pn_session_free(pn_event_session(e));
+ break;
+
+ case PN_LINK_REMOTE_CLOSE:
+ check_condition(e, pn_link_remote_condition(pn_event_link(e)));
+ link_unsub(b, pn_event_link(e));
+ pn_link_close(pn_event_link(e));
+ pn_link_free(pn_event_link(e));
+ break;
+
+ case PN_LISTENER_CLOSE:
+ check_condition(e, pn_listener_condition(pn_event_listener(e)));
+ broker_stop(b);
+ break;
+
+ break;
+
+ case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
+ broker_stop(b);
+ break;
+
+ case PN_PROACTOR_INTERRUPT:
+ b->finished = true;
+ pn_proactor_interrupt(b->proactor); /* Pass along the interrupt to the other threads */
+ break;
+
+ default:
+ break;
+ }
+}
+
+static void* broker_thread(void *void_broker) {
+ broker_t *b = (broker_t*)void_broker;
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(b->proactor);
+ pn_event_t *e;
+ while ((e = pn_event_batch_next(events))) {
+ handle(b, e);
+ }
+ pn_proactor_done(b->proactor, events);
+ } while(!b->finished);
+ return NULL;
+}
+
+int main(int argc, char **argv) {
+ broker_t b = {0};
+ b.proactor = pn_proactor();
+ queues_init(&b.queues);
+ b.container_id = argv[0];
+ b.threads = 4;
+ int i = 1;
+ const char *host = (argc > i) ? argv[i++] : "";
+ const char *port = (argc > i) ? argv[i++] : "amqp";
+
+ /* Listen on addr */
+ char addr[PN_MAX_ADDR];
+ pn_proactor_addr(addr, sizeof(addr), host, port);
+ pn_proactor_listen(b.proactor, pn_listener(), addr, 16);
+
+ /* Start n-1 threads */
+ pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
+ for (size_t i = 0; i < b.threads-1; ++i) {
+ pthread_create(&threads[i], NULL, broker_thread, &b);
+ }
+ broker_thread(&b); /* Use the main thread too. */
+ /* Join the other threads */
+ for (size_t i = 0; i < b.threads-1; ++i) {
+ pthread_join(threads[i], NULL);
+ }
+ pn_proactor_free(b.proactor);
+ free(threads);
+ return exit_code;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/direct.c b/examples/c/direct.c
new file mode 100644
index 0000000..15550e6
--- /dev/null
+++ b/examples/c/direct.c
@@ -0,0 +1,326 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/listener.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+typedef struct app_data_t {
+ const char *host, *port;
+ const char *amqp_address;
+ const char *container_id;
+ int message_count;
+
+ pn_proactor_t *proactor;
+ pn_listener_t *listener;
+ pn_rwbytes_t message_buffer;
+
+ /* Sender values */
+ int sent;
+ int acknowledged;
+ pn_link_t *sender;
+
+ /* Receiver values */
+ int received;
+} app_data_t;
+
+static const int BATCH = 1000; /* Batch size for unlimited receive */
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+ if (pn_condition_is_set(cond)) {
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
+ pn_connection_close(pn_event_connection(e));
+ exit_code = 1;
+ }
+}
+
+/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
+static pn_bytes_t encode_message(app_data_t* app) {
+ /* Construct a message with the map { "sequence": app.sent } */
+ pn_message_t* message = pn_message();
+ pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+ pn_data_t* body = pn_message_body(message);
+ pn_data_put_map(body);
+ pn_data_enter(body);
+ pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+ pn_data_put_int(body, app->sent); /* The sequence number */
+ pn_data_exit(body);
+
+ /* encode the message, expanding the encode buffer as needed */
+ if (app->message_buffer.start == NULL) {
+ static const size_t initial_size = 128;
+ app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+ }
+ /* app->message_buffer is the total buffer space available. */
+ /* mbuf wil point at just the portion used by the encoded message */
+ pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+ int status = 0;
+ while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+ app->message_buffer.size *= 2;
+ app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+ mbuf.size = app->message_buffer.size;
+ }
+ if (status != 0) {
+ fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+ exit(1);
+ }
+ pn_message_free(message);
+ return pn_bytes(mbuf.size, mbuf.start);
+}
+
+#define MAX_SIZE 1024
+
+static void decode_message(pn_delivery_t *dlv) {
+ static char buffer[MAX_SIZE];
+ ssize_t len;
+ // try to decode the message body
+ if (pn_delivery_pending(dlv) < MAX_SIZE) {
+ // read in the raw data
+ len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
+ if (len > 0) {
+ // decode it into a proton message
+ pn_message_t *m = pn_message();
+ if (PN_OK == pn_message_decode(m, buffer, len)) {
+ pn_string_t *s = pn_string(NULL);
+ pn_inspect(pn_message_body(m), s);
+ printf("%s\n", pn_string_get(s));
+ pn_free(s);
+ }
+ pn_message_free(m);
+ }
+ }
+}
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_LINK_REMOTE_OPEN: {
+ pn_link_t *l = pn_event_link(event);
+ pn_link_open(l);
+ pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+ } break;
+
+ case PN_DELIVERY: {
+ /* A message has been received */
+ pn_link_t *link = NULL;
+ pn_delivery_t *dlv = pn_event_delivery(event);
+ if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+ link = pn_delivery_link(dlv);
+ decode_message(dlv);
+ /* Accept the delivery */
+ pn_delivery_update(dlv, PN_ACCEPTED);
+ /* done with the delivery, move to the next and free it */
+ pn_link_advance(link);
+ pn_delivery_settle(dlv); /* dlv is now freed */
+
+ if (app->message_count == 0) {
+ /* receive forever - see if more credit is needed */
+ if (pn_link_credit(link) < BATCH/2) {
+ /* Grant enough credit to bring it up to BATCH: */
+ pn_link_flow(link, BATCH - pn_link_credit(link));
+ }
+ } else if (++app->received >= app->message_count) {
+ /* done receiving, close the endpoints */
+ printf("%d messages received\n", app->received);
+ pn_session_t *ssn = pn_link_session(link);
+ pn_link_close(link);
+ pn_session_close(ssn);
+ pn_connection_close(pn_session_connection(ssn));
+ }
+ }
+ } break;
+
+ default:
+ break;
+ }
+}
+
+/* This function handles events when we are acting as the sender */
+static void handle_send(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_LINK_REMOTE_OPEN: {
+ pn_link_t* l = pn_event_link(event);
+ pn_terminus_set_address(pn_link_target(l), app->amqp_address);
+ pn_link_open(l);
+ } break;
+
+ case PN_LINK_FLOW: {
+ /* The peer has given us some credit, now we can send messages */
+ pn_link_t *sender = pn_event_link(event);
+ while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+ ++app->sent;
+ // Use sent counter as unique delivery tag.
+ pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+ pn_bytes_t msgbuf = encode_message(app);
+ pn_link_send(sender, msgbuf.start, msgbuf.size);
+ pn_link_advance(sender);
+ }
+ break;
+ }
+
+ case PN_DELIVERY: {
+ /* We received acknowledgedment from the peer that a message was delivered. */
+ pn_delivery_t* d = pn_event_delivery(event);
+ if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+ if (++app->acknowledged == app->message_count) {
+ printf("%d messages sent and acknowledged\n", app->acknowledged);
+ pn_connection_close(pn_event_connection(event));
+ /* Continue handling events till we receive TRANSPORT_CLOSED */
+ }
+ }
+ } break;
+
+ default:
+ break;
+ }
+}
+
+/* Handle all events, delegate to handle_send or handle_receive depending on link mode.
+ Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_LISTENER_OPEN:
+ printf("listening\n");
+ fflush(stdout);
+ break;
+
+ case PN_LISTENER_ACCEPT:
+ pn_listener_accept(pn_event_listener(event), pn_connection());
+ break;
+
+ case PN_CONNECTION_INIT:
+ pn_connection_set_container(pn_event_connection(event), app->container_id);
+ break;
+
+ case PN_CONNECTION_BOUND: {
+ /* Turn off security */
+ pn_transport_t *t = pn_event_transport(event);
+ pn_transport_require_auth(t, false);
+ pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
+ }
+ case PN_CONNECTION_REMOTE_OPEN: {
+ pn_connection_open(pn_event_connection(event)); /* Complete the open */
+ break;
+ }
+
+ case PN_SESSION_REMOTE_OPEN: {
+ pn_session_open(pn_event_session(event));
+ break;
+ }
+
+ case PN_TRANSPORT_CLOSED:
+ check_condition(event, pn_transport_condition(pn_event_transport(event)));
+ pn_listener_close(app->listener); /* Finished */
+ break;
+
+ case PN_CONNECTION_REMOTE_CLOSE:
+ check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_SESSION_REMOTE_CLOSE:
+ check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_LINK_REMOTE_CLOSE:
+ case PN_LINK_REMOTE_DETACH:
+ check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_PROACTOR_TIMEOUT:
+ /* Wake the sender's connection */
+ pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
+ break;
+
+ case PN_LISTENER_CLOSE:
+ check_condition(event, pn_listener_condition(pn_event_listener(event)));
+ break;
+
+ case PN_PROACTOR_INACTIVE:
+ return false;
+ break;
+
+ default: {
+ pn_link_t *l = pn_event_link(event);
+ if (l) { /* Only delegate link-related events */
+ if (pn_link_is_sender(l)) {
+ handle_send(app, event);
+ } else {
+ handle_receive(app, event);
+ }
+ }
+ }
+ }
+ return true;
+}
+
+void run(app_data_t *app) {
+ /* Loop and handle events */
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+ for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+ if (!handle(app, e)) {
+ return;
+ }
+ }
+ pn_proactor_done(app->proactor, events);
+ } while(true);
+}
+
+int main(int argc, char **argv) {
+ struct app_data_t app = {0};
+ int i = 0;
+ app.container_id = argv[i++]; /* Should be unique */
+ app.host = (argc > 1) ? argv[i++] : "";
+ app.port = (argc > 1) ? argv[i++] : "amqp";
+ app.amqp_address = (argc > i) ? argv[i++] : "examples";
+ app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+
+ /* Create the proactor and connect */
+ app.proactor = pn_proactor();
+ app.listener = pn_listener();
+ char addr[PN_MAX_ADDR];
+ pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+ pn_proactor_listen(app.proactor, app.listener, addr, 16);
+ run(&app);
+ pn_proactor_free(app.proactor);
+ free(app.message_buffer.start);
+ return exit_code;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/example_test.py b/examples/c/example_test.py
new file mode 100644
index 0000000..02bb1fd
--- /dev/null
+++ b/examples/c/example_test.py
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# This is a test script to run the examples and verify that they behave as expected.
+
+import unittest, sys, time
+from proctest import *
+
+def python_cmd(name):
+ dir = os.path.dirname(__file__)
+ return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
+
+def receive_expect(n):
+ return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
+
+class Broker(object):
+ def __init__(self, test):
+ self.test = test
+
+ def __enter__(self):
+ with TestPort() as tp:
+ self.port = tp.port
+ self.host = tp.host
+ self.addr = tp.addr
+ self.proc = self.test.proc(["broker", "", self.port])
+ self.proc.wait_re("listening")
+ return self
+
+ def __exit__(self, *args):
+ b = getattr(self, "proc")
+ if b:
+ if b.poll() != None: # Broker crashed
+ raise ProcError(b, "broker crash")
+ b.kill()
+
+class CExampleTest(ProcTestCase):
+
+ def test_send_receive(self):
+ """Send first then receive"""
+ with Broker(self) as b:
+ s = self.proc(["send", "", b.port])
+ self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
+ r = self.proc(["receive", "", b.port])
+ self.assertEqual(receive_expect(10), r.wait_exit())
+
+ def test_receive_send(self):
+ """Start receiving first, then send."""
+ with Broker(self) as b:
+ r = self.proc(["receive", "", b.port]);
+ s = self.proc(["send", "", b.port]);
+ self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
+ self.assertEqual(receive_expect(10), r.wait_exit())
+
+ def test_send_direct(self):
+ """Send to direct server"""
+ with TestPort() as tp:
+ d = self.proc(["direct", "", tp.port])
+ d.wait_re("listening")
+ self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", "", tp.port]).wait_exit())
+ self.assertIn(receive_expect(10), d.wait_exit())
+
+ def test_receive_direct(self):
+ """Receive from direct server"""
+ with TestPort() as tp:
+ d = self.proc(["direct", "", tp.port])
+ d.wait_re("listening")
+ self.assertEqual(receive_expect(10), self.proc(["receive", "", tp.port]).wait_exit())
+ self.assertIn("10 messages sent and acknowledged\n", d.wait_exit())
+
+
+if __name__ == "__main__":
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/messenger/CMakeLists.txt b/examples/c/messenger/CMakeLists.txt
deleted file mode 100644
index d4fec71..0000000
--- a/examples/c/messenger/CMakeLists.txt
+++ /dev/null
@@ -1,52 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED)
-
-set (messenger-examples
- recv.c
- send.c
- recv-async.c
- send-async.c
- )
-
-set_source_files_properties (
- ${messenger-examples}
- PROPERTIES
- COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LINK_TIME_OPTIMIZATION}"
- )
-
-if (BUILD_WITH_CXX)
- set_source_files_properties (
- ${messenger-examples}
- PROPERTIES LANGUAGE CXX
- )
-endif (BUILD_WITH_CXX)
-
-add_executable(recv recv.c)
-add_executable(send send.c)
-add_executable(recv-async recv-async.c)
-add_executable(send-async send-async.c)
-
-include_directories(${Proton_INCLUDE_DIRS})
-
-target_link_libraries(recv ${Proton_LIBRARIES})
-target_link_libraries(send ${Proton_LIBRARIES})
-target_link_libraries(recv-async ${Proton_LIBRARIES})
-target_link_libraries(send-async ${Proton_LIBRARIES})
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/recv-async.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/recv-async.c b/examples/c/messenger/recv-async.c
deleted file mode 100644
index 1f49166..0000000
--- a/examples/c/messenger/recv-async.c
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// This is a re-implementation of recv.c using non-blocking/asynchronous calls.
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <ctype.h>
-
-#if EMSCRIPTEN
-#include <emscripten.h>
-#endif
-
-pn_message_t * message;
-pn_messenger_t * messenger;
-
-#define check(messenger) \
- { \
- if(pn_messenger_errno(messenger)) \
- { \
- die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
- } \
- } \
-
-void die(const char *file, int line, const char *message)
-{
- fprintf(stderr, "%s:%i: %s\n", file, line, message);
- exit(1);
-}
-
-void usage(void)
-{
- printf("Usage: recv [options] <addr>\n");
- printf("-c \tPath to the certificate file.\n");
- printf("-k \tPath to the private key file.\n");
- printf("-p \tPassword for the private key.\n");
- printf("<addr>\tAn address.\n");
- exit(0);
-}
-
-void process(void) {
- while(pn_messenger_incoming(messenger))
- {
- pn_messenger_get(messenger, message);
- check(messenger);
-
- {
- pn_tracker_t tracker = pn_messenger_incoming_tracker(messenger);
- char buffer[1024];
- size_t buffsize = sizeof(buffer);
- const char* subject = pn_message_get_subject(message);
- pn_data_t* body = pn_message_body(message);
- pn_data_format(body, buffer, &buffsize);
-
- printf("Address: %s\n", pn_message_get_address(message));
- printf("Subject: %s\n", subject ? subject : "(no subject)");
- printf("Content: %s\n", buffer);
-
- pn_messenger_accept(messenger, tracker, 0);
- }
- }
-}
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
-void pump(int fd, void* userData) {
- while (pn_messenger_work(messenger, 0) >= 0) {
- process();
- }
-}
-
-void onclose(int fd, void* userData) {
- process();
-}
-
-void onerror(int fd, int errno, const char* msg, void* userData) {
- printf("error callback fd = %d, errno = %d, msg = %s\n", fd, errno, msg);
-}
-#endif
-
-int main(int argc, char** argv)
-{
- char* certificate = NULL;
- char* privatekey = NULL;
- char* password = NULL;
- char* address = (char *) "amqp://~0.0.0.0";
- int c;
-
- message = pn_message();
- messenger = pn_messenger(NULL);
- pn_messenger_set_blocking(messenger, false); // Needs to be set non-blocking to behave asynchronously.
-
- opterr = 0;
-
- while((c = getopt(argc, argv, "hc:k:p:")) != -1)
- {
- switch(c)
- {
- case 'h':
- usage();
- break;
-
- case 'c': certificate = optarg; break;
- case 'k': privatekey = optarg; break;
- case 'p': password = optarg; break;
-
- case '?':
- if (optopt == 'c' ||
- optopt == 'k' ||
- optopt == 'p')
- {
- fprintf(stderr, "Option -%c requires an argument.\n", optopt);
- }
- else if(isprint(optopt))
- {
- fprintf(stderr, "Unknown option `-%c'.\n", optopt);
- }
- else
- {
- fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
- }
- return 1;
- default:
- abort();
- }
- }
-
- if (optind < argc)
- {
- address = argv[optind];
- }
-
- /* load the various command line options if they're set */
- if(certificate)
- {
- pn_messenger_set_certificate(messenger, certificate);
- }
-
- if(privatekey)
- {
- pn_messenger_set_private_key(messenger, privatekey);
- }
-
- if(password)
- {
- pn_messenger_set_password(messenger, password);
- }
-
- pn_messenger_start(messenger);
- check(messenger);
-
- pn_messenger_subscribe(messenger, address);
- check(messenger);
-
- pn_messenger_recv(messenger, -1); // Set to receive as many messages as messenger can buffer.
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
- emscripten_set_socket_error_callback(NULL, onerror);
-
- emscripten_set_socket_open_callback(NULL, pump);
- emscripten_set_socket_connection_callback(NULL, pump);
- emscripten_set_socket_message_callback(NULL, pump);
- emscripten_set_socket_close_callback(NULL, onclose);
-#else // For native compiler.
- while (1) {
- pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
- process();
- }
-#endif
-
- return 0;
-}
-
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/recv.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/recv.c b/examples/c/messenger/recv.c
deleted file mode 100644
index 16e8321..0000000
--- a/examples/c/messenger/recv.c
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <ctype.h>
-
-#define check(messenger) \
- { \
- if(pn_messenger_errno(messenger)) \
- { \
- die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
- } \
- } \
-
-void die(const char *file, int line, const char *message)
-{
- fprintf(stderr, "%s:%i: %s\n", file, line, message);
- exit(1);
-}
-
-void usage(void)
-{
- printf("Usage: recv [options] <addr>\n");
- printf("-c \tPath to the certificate file.\n");
- printf("-k \tPath to the private key file.\n");
- printf("-p \tPassword for the private key.\n");
- printf("<addr>\tAn address.\n");
- exit(0);
-}
-
-int main(int argc, char** argv)
-{
- char* certificate = NULL;
- char* privatekey = NULL;
- char* password = NULL;
- char* address = (char *) "amqp://~0.0.0.0";
- int c;
-
- pn_message_t * message;
- pn_messenger_t * messenger;
-
- message = pn_message();
- messenger = pn_messenger(NULL);
-
- opterr = 0;
-
- while((c = getopt(argc, argv, "hc:k:p:")) != -1)
- {
- switch(c)
- {
- case 'h':
- usage();
- break;
-
- case 'c': certificate = optarg; break;
- case 'k': privatekey = optarg; break;
- case 'p': password = optarg; break;
-
- case '?':
- if(optopt == 'c' ||
- optopt == 'k' ||
- optopt == 'p')
- {
- fprintf(stderr, "Option -%c requires an argument.\n", optopt);
- }
- else if(isprint(optopt))
- {
- fprintf(stderr, "Unknown option `-%c'.\n", optopt);
- }
- else
- {
- fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
- }
- return 1;
- default:
- abort();
- }
- }
-
- if (optind < argc)
- {
- address = argv[optind];
- }
-
- /* load the various command line options if they're set */
- if(certificate)
- {
- pn_messenger_set_certificate(messenger, certificate);
- }
-
- if(privatekey)
- {
- pn_messenger_set_private_key(messenger, privatekey);
- }
-
- if(password)
- {
- pn_messenger_set_password(messenger, password);
- }
-
- pn_messenger_start(messenger);
- check(messenger);
-
- pn_messenger_subscribe(messenger, address);
- check(messenger);
-
- for(;;)
- {
- pn_messenger_recv(messenger, 1024);
- check(messenger);
-
- while(pn_messenger_incoming(messenger))
- {
- pn_messenger_get(messenger, message);
- check(messenger);
-
- {
- char buffer[1024];
- size_t buffsize = sizeof(buffer);
- const char* subject = pn_message_get_subject(message);
- pn_data_t *body = pn_message_body(message);
- pn_data_format(body, buffer, &buffsize);
-
- printf("Address: %s\n", pn_message_get_address(message));
- printf("Subject: %s\n", subject ? subject : "(no subject)");
- printf("Content: %s\n", buffer);
- }
- }
- }
-
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/send-async.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/send-async.c b/examples/c/messenger/send-async.c
deleted file mode 100644
index de9b023..0000000
--- a/examples/c/messenger/send-async.c
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// This is a re-implementation of send.c using non-blocking/asynchronous calls.
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <ctype.h>
-
-#if EMSCRIPTEN
-#include <emscripten.h>
-#endif
-
-pn_message_t * message;
-pn_messenger_t * messenger;
-pn_tracker_t tracker;
-int running = 1;
-
-#define check(messenger) \
- { \
- if(pn_messenger_errno(messenger)) \
- { \
- die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
- } \
- } \
-
-void die(const char *file, int line, const char *message)
-{
- fprintf(stderr, "%s:%i: %s\n", file, line, message);
- exit(1);
-}
-
-void usage(void)
-{
- printf("Usage: send [-a addr] [message]\n");
- printf("-a \tThe target address [amqp[s]://domain[/name]]\n");
- printf("message\tA text string to send.\n");
- exit(0);
-}
-
-void process(void) {
- pn_status_t status = pn_messenger_status(messenger, tracker);
- if (status != PN_STATUS_PENDING) {
- if (running) {
- pn_messenger_stop(messenger);
- running = 0;
- }
- }
-
- if (pn_messenger_stopped(messenger)) {
- pn_message_free(message);
- pn_messenger_free(messenger);
- message = NULL;
- messenger = NULL;
- }
-}
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
-void pump(int fd, void* userData) {
- while (pn_messenger_work(messenger, 0) >= 0) {
- process();
- }
-}
-
-void onclose(int fd, void* userData) {
- process();
-}
-
-void onerror(int fd, int errno, const char* msg, void* userData) {
- printf("error callback fd = %d, errno = %d, msg = %s\n", fd, errno, msg);
-}
-#endif
-
-int main(int argc, char** argv)
-{
- int c;
- char * address = (char *) "amqp://0.0.0.0";
- char * msgtext = (char *) "Hello World!";
- pn_data_t* body;
-
- opterr = 0;
-
- while((c = getopt(argc, argv, "ha:b:c:")) != -1)
- {
- switch(c)
- {
- case 'a': address = optarg; break;
- case 'h': usage(); break;
-
- case '?':
- if(optopt == 'a')
- {
- fprintf(stderr, "Option -%c requires an argument.\n", optopt);
- }
- else if(isprint(optopt))
- {
- fprintf(stderr, "Unknown option `-%c'.\n", optopt);
- }
- else
- {
- fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
- }
- return 1;
- default:
- abort();
- }
- }
-
- if (optind < argc) msgtext = argv[optind];
-
- message = pn_message();
- messenger = pn_messenger(NULL);
- pn_messenger_set_blocking(messenger, false); // Needs to be set non-blocking to behave asynchronously.
- pn_messenger_set_outgoing_window(messenger, 1024);
-
- pn_messenger_start(messenger);
-
- pn_message_set_address(message, address);
- body = pn_message_body(message);
- pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
-
- pn_messenger_put(messenger, message);
- check(messenger);
-
- tracker = pn_messenger_outgoing_tracker(messenger);
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
- emscripten_set_socket_error_callback(NULL, onerror);
-
- emscripten_set_socket_open_callback(NULL, pump);
- emscripten_set_socket_connection_callback(NULL, pump);
- emscripten_set_socket_message_callback(NULL, pump);
- emscripten_set_socket_close_callback(NULL, onclose);
-#else // For native compiler.
- while (running) {
- pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
- process();
- }
-
- while (messenger && !pn_messenger_stopped(messenger)) {
- pn_messenger_work(messenger, 0);
- process();
- }
-#endif
-
- return 0;
-}
-
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/send.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/send.c b/examples/c/messenger/send.c
deleted file mode 100644
index 11b47ff..0000000
--- a/examples/c/messenger/send.c
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <ctype.h>
-
-#define check(messenger) \
- { \
- if(pn_messenger_errno(messenger)) \
- { \
- die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
- } \
- } \
-
-void die(const char *file, int line, const char *message)
-{
- fprintf(stderr, "%s:%i: %s\n", file, line, message);
- exit(1);
-}
-
-void usage(void)
-{
- printf("Usage: send [-a addr] [message]\n");
- printf("-a \tThe target address [amqp[s]://domain[/name]]\n");
- printf("message\tA text string to send.\n");
- exit(0);
-}
-
-int main(int argc, char** argv)
-{
- int c;
- char * address = (char *) "amqp://0.0.0.0";
- char * msgtext = (char *) "Hello World!";
- opterr = 0;
-
- while((c = getopt(argc, argv, "ha:b:c:")) != -1)
- {
- switch(c)
- {
- case 'a': address = optarg; break;
- case 'h': usage(); break;
-
- case '?':
- if(optopt == 'a')
- {
- fprintf(stderr, "Option -%c requires an argument.\n", optopt);
- }
- else if(isprint(optopt))
- {
- fprintf(stderr, "Unknown option `-%c'.\n", optopt);
- }
- else
- {
- fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
- }
- return 1;
- default:
- abort();
- }
- }
-
- if (optind < argc) msgtext = argv[optind];
-
- {
- pn_message_t * message;
- pn_messenger_t * messenger;
- pn_data_t * body;
-
- message = pn_message();
- messenger = pn_messenger(NULL);
-
- pn_messenger_start(messenger);
-
- pn_message_set_address(message, address);
- body = pn_message_body(message);
- pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
- pn_messenger_put(messenger, message);
- check(messenger);
- pn_messenger_send(messenger, -1);
- check(messenger);
-
- pn_messenger_stop(messenger);
- pn_messenger_free(messenger);
- pn_message_free(message);
- }
-
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
deleted file mode 100644
index 6ea8aaf..0000000
--- a/examples/c/proactor/CMakeLists.txt
+++ /dev/null
@@ -1,44 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED Core Proactor)
-set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
-find_package(Threads REQUIRED)
-
-include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
-include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
-
-add_definitions(${COMPILE_LANGUAGE_FLAGS} ${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
-
-# Add a test with the correct environment to find test executables and valgrind.
-if(WIN32)
- set(test_path "$<TARGET_FILE_DIR:broker>;$<TARGET_FILE_DIR:qpid-proton>")
-else(WIN32)
- set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
-endif(WIN32)
-
-foreach(name broker send receive direct)
- add_executable(proactor-${name} ${name}.c)
- target_link_libraries(proactor-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
- set_target_properties(proactor-${name} PROPERTIES OUTPUT_NAME ${name})
-endforeach()
-
-set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
-
-add_test(c-example-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox
deleted file mode 100644
index a548d35..0000000
--- a/examples/c/proactor/README.dox
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * @example send.c
- *
- * Send a fixed number of messages to the "examples" node.
- * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
- *
- * @example receive.c
- *
- * Subscribes to the 'example' node and prints the message bodies received.
- * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
- *
- * @example direct.c
- *
- * A server that can be used to demonstrate direct (no broker) peer-to-peer communication
- * It can accept an incoming connection from either the @ref send.c or @ref receive.c examples
- * and will act as the directly-connected counterpart (receive or send)
- *
- * @example broker.c
- *
- * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples.
- */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
deleted file mode 100644
index e0d9672..0000000
--- a/examples/c/proactor/broker.c
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "thread.h"
-
-#include <proton/engine.h>
-#include <proton/listener.h>
-#include <proton/proactor.h>
-#include <proton/sasl.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-/* Simple re-sizable vector that acts as a queue */
-#define VEC(T) struct { T* data; size_t len, cap; }
-
-#define VEC_INIT(V) \
- do { \
- V.len = 0; \
- V.cap = 16; \
- void **vp = (void**)&V.data; \
- *vp = malloc(V.cap * sizeof(*V.data)); \
- } while(0)
-
-#define VEC_FINAL(V) free(V.data)
-
-#define VEC_PUSH(V, X) \
- do { \
- if (V.len == V.cap) { \
- V.cap *= 2; \
- void **vp = (void**)&V.data; \
- *vp = realloc(V.data, V.cap * sizeof(*V.data)); \
- } \
- V.data[V.len++] = X; \
- } while(0) \
-
-#define VEC_POP(V) \
- do { \
- if (V.len > 0) \
- memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data)); \
- } while(0)
-
-/* Simple thread-safe queue implementation */
-typedef struct queue_t {
- pthread_mutex_t lock;
- char name[256];
- VEC(pn_rwbytes_t) messages; /* Messages on the queue_t */
- VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
- struct queue_t *next; /* Next queue in chain */
- size_t sent; /* Count of messages sent, used as delivery tag */
-} queue_t;
-
-static void queue_init(queue_t *q, const char* name, queue_t *next) {
- pthread_mutex_init(&q->lock, NULL);
- strncpy(q->name, name, sizeof(q->name));
- VEC_INIT(q->messages);
- VEC_INIT(q->waiting);
- q->next = next;
- q->sent = 0;
-}
-
-static void queue_destroy(queue_t *q) {
- pthread_mutex_destroy(&q->lock);
- free(q->name);
- for (size_t i = 0; i < q->messages.len; ++i)
- free(q->messages.data[i].start);
- VEC_FINAL(q->messages);
- for (size_t i = 0; i < q->waiting.len; ++i)
- pn_decref(q->waiting.data[i]);
- VEC_FINAL(q->waiting);
-}
-
-/* Send a message on s, or record s as eating if no messages.
- Called in s dispatch loop, assumes s has credit.
-*/
-static void queue_send(queue_t *q, pn_link_t *s) {
- pn_rwbytes_t m = { 0 };
- size_t tag = 0;
- pthread_mutex_lock(&q->lock);
- if (q->messages.len == 0) { /* Empty, record connection as waiting */
- /* Record connection for wake-up if not already on the list. */
- pn_connection_t *c = pn_session_connection(pn_link_session(s));
- size_t i = 0;
- for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
- ;
- if (i == q->waiting.len) {
- VEC_PUSH(q->waiting, c);
- }
- } else {
- m = q->messages.data[0];
- VEC_POP(q->messages);
- tag = ++q->sent;
- }
- pthread_mutex_unlock(&q->lock);
- if (m.start) {
- pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
- pn_link_send(s, m.start, m.size);
- pn_link_advance(s);
- pn_delivery_settle(d); /* Pre-settled: unreliable, there will bea no ack/ */
- free(m.start);
- }
-}
-
-/* Data associated with each broker connection */
-typedef struct broker_data_t {
- bool check_queues; /* Check senders on the connection for available data in queues. */
-} broker_data_t;
-
-/* Use the context pointer as a boolean flag to indicate we need to check queues */
-void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
- pn_connection_set_context(c, (void*)check);
-}
-
-bool pn_connection_get_check_queues(pn_connection_t *c) {
- return (bool)pn_connection_get_context(c);
-}
-
-/* Put a message on the queue, called in receiver dispatch loop.
- If the queue was previously empty, notify waiting senders.
-*/
-static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
- pthread_mutex_lock(&q->lock);
- VEC_PUSH(q->messages, m);
- if (q->messages.len == 1) { /* Was empty, notify waiting connections */
- for (size_t i = 0; i < q->waiting.len; ++i) {
- pn_connection_t *c = q->waiting.data[i];
- pn_connection_set_check_queues(c, true);
- pn_connection_wake(c); /* Wake the connection */
- }
- q->waiting.len = 0;
- }
- pthread_mutex_unlock(&q->lock);
-}
-
-/* Thread safe set of queues */
-typedef struct queues_t {
- pthread_mutex_t lock;
- queue_t *queues;
- size_t sent;
-} queues_t;
-
-void queues_init(queues_t *qs) {
- pthread_mutex_init(&qs->lock, NULL);
- qs->queues = NULL;
-}
-
-void queues_destroy(queues_t *qs) {
- for (queue_t *q = qs->queues; q; q = q->next) {
- queue_destroy(q);
- free(q);
- }
- pthread_mutex_destroy(&qs->lock);
-}
-
-/** Get or create the named queue. */
-queue_t* queues_get(queues_t *qs, const char* name) {
- pthread_mutex_lock(&qs->lock);
- queue_t *q;
- for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
- ;
- if (!q) {
- q = (queue_t*)malloc(sizeof(queue_t));
- queue_init(q, name, qs->queues);
- qs->queues = q;
- }
- pthread_mutex_unlock(&qs->lock);
- return q;
-}
-
-/* The broker implementation */
-typedef struct broker_t {
- pn_proactor_t *proactor;
- size_t threads;
- const char *container_id; /* AMQP container-id */
- queues_t queues;
- bool finished;
-} broker_t;
-
-void broker_stop(broker_t *b) {
- /* Interrupt the proactor to stop the working threads. */
- pn_proactor_interrupt(b->proactor);
-}
-
-/* Try to send if link is sender and has credit */
-static void link_send(broker_t *b, pn_link_t *s) {
- if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
- const char *qname = pn_terminus_get_address(pn_link_source(s));
- queue_t *q = queues_get(&b->queues, qname);
- queue_send(q, s);
- }
-}
-
-static void queue_unsub(queue_t *q, pn_connection_t *c) {
- pthread_mutex_lock(&q->lock);
- for (size_t i = 0; i < q->waiting.len; ++i) {
- if (q->waiting.data[i] == c){
- q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
- VEC_POP(q->waiting);
- break;
- }
- }
- pthread_mutex_unlock(&q->lock);
-}
-
-/* Unsubscribe from the queue of interest to this link. */
-static void link_unsub(broker_t *b, pn_link_t *s) {
- if (pn_link_is_sender(s)) {
- const char *qname = pn_terminus_get_address(pn_link_source(s));
- if (qname) {
- queue_t *q = queues_get(&b->queues, qname);
- queue_unsub(q, pn_session_connection(pn_link_session(s)));
- }
- }
-}
-
-/* Called in connection's event loop when a connection is woken for messages.*/
-static void connection_unsub(broker_t *b, pn_connection_t *c) {
- for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
- link_unsub(b, l);
-}
-
-static void session_unsub(broker_t *b, pn_session_t *ssn) {
- pn_connection_t *c = pn_session_connection(ssn);
- for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
- if (pn_link_session(l) == ssn)
- link_unsub(b, l);
- }
-}
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
- if (pn_condition_is_set(cond)) {
- fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
- pn_condition_get_name(cond), pn_condition_get_description(cond));
- exit_code = 1; /* Remeber there was an unexpected error */
- }
-}
-
-const int WINDOW=10; /* Incoming credit window */
-
-static void handle(broker_t* b, pn_event_t* e) {
- pn_connection_t *c = pn_event_connection(e);
-
- switch (pn_event_type(e)) {
-
- case PN_LISTENER_OPEN:
- printf("listening\n");
- fflush(stdout);
- break;
-
- case PN_LISTENER_ACCEPT:
- pn_listener_accept(pn_event_listener(e), pn_connection());
- break;
-
- case PN_CONNECTION_INIT:
- pn_connection_set_container(c, b->container_id);
- break;
-
- case PN_CONNECTION_BOUND: {
- /* Turn off security */
- pn_transport_t *t = pn_connection_transport(c);
- pn_transport_require_auth(t, false);
- pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
- break;
- }
- case PN_CONNECTION_REMOTE_OPEN: {
- pn_connection_open(pn_event_connection(e)); /* Complete the open */
- break;
- }
- case PN_CONNECTION_WAKE: {
- if (pn_connection_get_check_queues(c)) {
- pn_connection_set_check_queues(c, false);
- int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
- for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
- link_send(b, l);
- }
- break;
- }
- case PN_SESSION_REMOTE_OPEN: {
- pn_session_open(pn_event_session(e));
- break;
- }
- case PN_LINK_REMOTE_OPEN: {
- pn_link_t *l = pn_event_link(e);
- if (pn_link_is_sender(l)) {
- const char *source = pn_terminus_get_address(pn_link_remote_source(l));
- pn_terminus_set_address(pn_link_source(l), source);
- } else {
- const char* target = pn_terminus_get_address(pn_link_remote_target(l));
- pn_terminus_set_address(pn_link_target(l), target);
- pn_link_flow(l, WINDOW);
- }
- pn_link_open(l);
- break;
- }
- case PN_LINK_FLOW: {
- link_send(b, pn_event_link(e));
- break;
- }
- case PN_DELIVERY: {
- pn_delivery_t *d = pn_event_delivery(e);
- pn_link_t *r = pn_delivery_link(d);
- if (pn_link_is_receiver(r) &&
- pn_delivery_readable(d) && !pn_delivery_partial(d))
- {
- size_t size = pn_delivery_pending(d);
- /* The broker does not decode the message, just forwards it. */
- pn_rwbytes_t m = { size, (char*)malloc(size) };
- pn_link_recv(r, m.start, m.size);
- const char *qname = pn_terminus_get_address(pn_link_target(r));
- queue_receive(b->proactor, queues_get(&b->queues, qname), m);
- pn_delivery_update(d, PN_ACCEPTED);
- pn_delivery_settle(d);
- pn_link_flow(r, WINDOW - pn_link_credit(r));
- }
- break;
- }
-
- case PN_TRANSPORT_CLOSED:
- check_condition(e, pn_transport_condition(pn_event_transport(e)));
- connection_unsub(b, pn_event_connection(e));
- break;
-
- case PN_CONNECTION_REMOTE_CLOSE:
- check_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
- pn_connection_close(pn_event_connection(e));
- break;
-
- case PN_SESSION_REMOTE_CLOSE:
- check_condition(e, pn_session_remote_condition(pn_event_session(e)));
- session_unsub(b, pn_event_session(e));
- pn_session_close(pn_event_session(e));
- pn_session_free(pn_event_session(e));
- break;
-
- case PN_LINK_REMOTE_CLOSE:
- check_condition(e, pn_link_remote_condition(pn_event_link(e)));
- link_unsub(b, pn_event_link(e));
- pn_link_close(pn_event_link(e));
- pn_link_free(pn_event_link(e));
- break;
-
- case PN_LISTENER_CLOSE:
- check_condition(e, pn_listener_condition(pn_event_listener(e)));
- broker_stop(b);
- break;
-
- break;
-
- case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
- broker_stop(b);
- break;
-
- case PN_PROACTOR_INTERRUPT:
- b->finished = true;
- pn_proactor_interrupt(b->proactor); /* Pass along the interrupt to the other threads */
- break;
-
- default:
- break;
- }
-}
-
-static void* broker_thread(void *void_broker) {
- broker_t *b = (broker_t*)void_broker;
- do {
- pn_event_batch_t *events = pn_proactor_wait(b->proactor);
- pn_event_t *e;
- while ((e = pn_event_batch_next(events))) {
- handle(b, e);
- }
- pn_proactor_done(b->proactor, events);
- } while(!b->finished);
- return NULL;
-}
-
-int main(int argc, char **argv) {
- broker_t b = {0};
- b.proactor = pn_proactor();
- queues_init(&b.queues);
- b.container_id = argv[0];
- b.threads = 4;
- int i = 1;
- const char *host = (argc > i) ? argv[i++] : "";
- const char *port = (argc > i) ? argv[i++] : "amqp";
-
- /* Listen on addr */
- char addr[PN_MAX_ADDR];
- pn_proactor_addr(addr, sizeof(addr), host, port);
- pn_proactor_listen(b.proactor, pn_listener(), addr, 16);
-
- /* Start n-1 threads */
- pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
- for (size_t i = 0; i < b.threads-1; ++i) {
- pthread_create(&threads[i], NULL, broker_thread, &b);
- }
- broker_thread(&b); /* Use the main thread too. */
- /* Join the other threads */
- for (size_t i = 0; i < b.threads-1; ++i) {
- pthread_join(threads[i], NULL);
- }
- pn_proactor_free(b.proactor);
- free(threads);
- return exit_code;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
deleted file mode 100644
index 15550e6..0000000
--- a/examples/c/proactor/direct.c
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/condition.h>
-#include <proton/connection.h>
-#include <proton/delivery.h>
-#include <proton/link.h>
-#include <proton/listener.h>
-#include <proton/message.h>
-#include <proton/proactor.h>
-#include <proton/sasl.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-typedef struct app_data_t {
- const char *host, *port;
- const char *amqp_address;
- const char *container_id;
- int message_count;
-
- pn_proactor_t *proactor;
- pn_listener_t *listener;
- pn_rwbytes_t message_buffer;
-
- /* Sender values */
- int sent;
- int acknowledged;
- pn_link_t *sender;
-
- /* Receiver values */
- int received;
-} app_data_t;
-
-static const int BATCH = 1000; /* Batch size for unlimited receive */
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
- if (pn_condition_is_set(cond)) {
- fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
- pn_condition_get_name(cond), pn_condition_get_description(cond));
- pn_connection_close(pn_event_connection(e));
- exit_code = 1;
- }
-}
-
-/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
-static pn_bytes_t encode_message(app_data_t* app) {
- /* Construct a message with the map { "sequence": app.sent } */
- pn_message_t* message = pn_message();
- pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
- pn_data_t* body = pn_message_body(message);
- pn_data_put_map(body);
- pn_data_enter(body);
- pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
- pn_data_put_int(body, app->sent); /* The sequence number */
- pn_data_exit(body);
-
- /* encode the message, expanding the encode buffer as needed */
- if (app->message_buffer.start == NULL) {
- static const size_t initial_size = 128;
- app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
- }
- /* app->message_buffer is the total buffer space available. */
- /* mbuf wil point at just the portion used by the encoded message */
- pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
- int status = 0;
- while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
- app->message_buffer.size *= 2;
- app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
- mbuf.size = app->message_buffer.size;
- }
- if (status != 0) {
- fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
- exit(1);
- }
- pn_message_free(message);
- return pn_bytes(mbuf.size, mbuf.start);
-}
-
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
- static char buffer[MAX_SIZE];
- ssize_t len;
- // try to decode the message body
- if (pn_delivery_pending(dlv) < MAX_SIZE) {
- // read in the raw data
- len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
- if (len > 0) {
- // decode it into a proton message
- pn_message_t *m = pn_message();
- if (PN_OK == pn_message_decode(m, buffer, len)) {
- pn_string_t *s = pn_string(NULL);
- pn_inspect(pn_message_body(m), s);
- printf("%s\n", pn_string_get(s));
- pn_free(s);
- }
- pn_message_free(m);
- }
- }
-}
-
-/* This function handles events when we are acting as the receiver */
-static void handle_receive(app_data_t* app, pn_event_t* event) {
- switch (pn_event_type(event)) {
-
- case PN_LINK_REMOTE_OPEN: {
- pn_link_t *l = pn_event_link(event);
- pn_link_open(l);
- pn_link_flow(l, app->message_count ? app->message_count : BATCH);
- } break;
-
- case PN_DELIVERY: {
- /* A message has been received */
- pn_link_t *link = NULL;
- pn_delivery_t *dlv = pn_event_delivery(event);
- if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
- link = pn_delivery_link(dlv);
- decode_message(dlv);
- /* Accept the delivery */
- pn_delivery_update(dlv, PN_ACCEPTED);
- /* done with the delivery, move to the next and free it */
- pn_link_advance(link);
- pn_delivery_settle(dlv); /* dlv is now freed */
-
- if (app->message_count == 0) {
- /* receive forever - see if more credit is needed */
- if (pn_link_credit(link) < BATCH/2) {
- /* Grant enough credit to bring it up to BATCH: */
- pn_link_flow(link, BATCH - pn_link_credit(link));
- }
- } else if (++app->received >= app->message_count) {
- /* done receiving, close the endpoints */
- printf("%d messages received\n", app->received);
- pn_session_t *ssn = pn_link_session(link);
- pn_link_close(link);
- pn_session_close(ssn);
- pn_connection_close(pn_session_connection(ssn));
- }
- }
- } break;
-
- default:
- break;
- }
-}
-
-/* This function handles events when we are acting as the sender */
-static void handle_send(app_data_t* app, pn_event_t* event) {
- switch (pn_event_type(event)) {
-
- case PN_LINK_REMOTE_OPEN: {
- pn_link_t* l = pn_event_link(event);
- pn_terminus_set_address(pn_link_target(l), app->amqp_address);
- pn_link_open(l);
- } break;
-
- case PN_LINK_FLOW: {
- /* The peer has given us some credit, now we can send messages */
- pn_link_t *sender = pn_event_link(event);
- while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
- ++app->sent;
- // Use sent counter as unique delivery tag.
- pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
- pn_bytes_t msgbuf = encode_message(app);
- pn_link_send(sender, msgbuf.start, msgbuf.size);
- pn_link_advance(sender);
- }
- break;
- }
-
- case PN_DELIVERY: {
- /* We received acknowledgedment from the peer that a message was delivered. */
- pn_delivery_t* d = pn_event_delivery(event);
- if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
- if (++app->acknowledged == app->message_count) {
- printf("%d messages sent and acknowledged\n", app->acknowledged);
- pn_connection_close(pn_event_connection(event));
- /* Continue handling events till we receive TRANSPORT_CLOSED */
- }
- }
- } break;
-
- default:
- break;
- }
-}
-
-/* Handle all events, delegate to handle_send or handle_receive depending on link mode.
- Return true to continue, false to exit
-*/
-static bool handle(app_data_t* app, pn_event_t* event) {
- switch (pn_event_type(event)) {
-
- case PN_LISTENER_OPEN:
- printf("listening\n");
- fflush(stdout);
- break;
-
- case PN_LISTENER_ACCEPT:
- pn_listener_accept(pn_event_listener(event), pn_connection());
- break;
-
- case PN_CONNECTION_INIT:
- pn_connection_set_container(pn_event_connection(event), app->container_id);
- break;
-
- case PN_CONNECTION_BOUND: {
- /* Turn off security */
- pn_transport_t *t = pn_event_transport(event);
- pn_transport_require_auth(t, false);
- pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
- }
- case PN_CONNECTION_REMOTE_OPEN: {
- pn_connection_open(pn_event_connection(event)); /* Complete the open */
- break;
- }
-
- case PN_SESSION_REMOTE_OPEN: {
- pn_session_open(pn_event_session(event));
- break;
- }
-
- case PN_TRANSPORT_CLOSED:
- check_condition(event, pn_transport_condition(pn_event_transport(event)));
- pn_listener_close(app->listener); /* Finished */
- break;
-
- case PN_CONNECTION_REMOTE_CLOSE:
- check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_SESSION_REMOTE_CLOSE:
- check_condition(event, pn_session_remote_condition(pn_event_session(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_LINK_REMOTE_CLOSE:
- case PN_LINK_REMOTE_DETACH:
- check_condition(event, pn_link_remote_condition(pn_event_link(event)));
- pn_connection_close(pn_event_connection(event));
- break;
-
- case PN_PROACTOR_TIMEOUT:
- /* Wake the sender's connection */
- pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
- break;
-
- case PN_LISTENER_CLOSE:
- check_condition(event, pn_listener_condition(pn_event_listener(event)));
- break;
-
- case PN_PROACTOR_INACTIVE:
- return false;
- break;
-
- default: {
- pn_link_t *l = pn_event_link(event);
- if (l) { /* Only delegate link-related events */
- if (pn_link_is_sender(l)) {
- handle_send(app, event);
- } else {
- handle_receive(app, event);
- }
- }
- }
- }
- return true;
-}
-
-void run(app_data_t *app) {
- /* Loop and handle events */
- do {
- pn_event_batch_t *events = pn_proactor_wait(app->proactor);
- for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
- if (!handle(app, e)) {
- return;
- }
- }
- pn_proactor_done(app->proactor, events);
- } while(true);
-}
-
-int main(int argc, char **argv) {
- struct app_data_t app = {0};
- int i = 0;
- app.container_id = argv[i++]; /* Should be unique */
- app.host = (argc > 1) ? argv[i++] : "";
- app.port = (argc > 1) ? argv[i++] : "amqp";
- app.amqp_address = (argc > i) ? argv[i++] : "examples";
- app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
-
- /* Create the proactor and connect */
- app.proactor = pn_proactor();
- app.listener = pn_listener();
- char addr[PN_MAX_ADDR];
- pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
- pn_proactor_listen(app.proactor, app.listener, addr, 16);
- run(&app);
- pn_proactor_free(app.proactor);
- free(app.message_buffer.start);
- return exit_code;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/example_test.py b/examples/c/proactor/example_test.py
deleted file mode 100644
index 02bb1fd..0000000
--- a/examples/c/proactor/example_test.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License
-#
-
-# This is a test script to run the examples and verify that they behave as expected.
-
-import unittest, sys, time
-from proctest import *
-
-def python_cmd(name):
- dir = os.path.dirname(__file__)
- return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
-
-def receive_expect(n):
- return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
-
-class Broker(object):
- def __init__(self, test):
- self.test = test
-
- def __enter__(self):
- with TestPort() as tp:
- self.port = tp.port
- self.host = tp.host
- self.addr = tp.addr
- self.proc = self.test.proc(["broker", "", self.port])
- self.proc.wait_re("listening")
- return self
-
- def __exit__(self, *args):
- b = getattr(self, "proc")
- if b:
- if b.poll() != None: # Broker crashed
- raise ProcError(b, "broker crash")
- b.kill()
-
-class CExampleTest(ProcTestCase):
-
- def test_send_receive(self):
- """Send first then receive"""
- with Broker(self) as b:
- s = self.proc(["send", "", b.port])
- self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
- r = self.proc(["receive", "", b.port])
- self.assertEqual(receive_expect(10), r.wait_exit())
-
- def test_receive_send(self):
- """Start receiving first, then send."""
- with Broker(self) as b:
- r = self.proc(["receive", "", b.port]);
- s = self.proc(["send", "", b.port]);
- self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
- self.assertEqual(receive_expect(10), r.wait_exit())
-
- def test_send_direct(self):
- """Send to direct server"""
- with TestPort() as tp:
- d = self.proc(["direct", "", tp.port])
- d.wait_re("listening")
- self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", "", tp.port]).wait_exit())
- self.assertIn(receive_expect(10), d.wait_exit())
-
- def test_receive_direct(self):
- """Receive from direct server"""
- with TestPort() as tp:
- d = self.proc(["direct", "", tp.port])
- d.wait_re("listening")
- self.assertEqual(receive_expect(10), self.proc(["receive", "", tp.port]).wait_exit())
- self.assertIn("10 messages sent and acknowledged\n", d.wait_exit())
-
-
-if __name__ == "__main__":
- unittest.main()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org