You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jr...@apache.org on 2017/09/11 23:18:16 UTC

[1/2] qpid-proton git commit: PROTON-1585: Remove old reactor and messenger examples; promote the proactor examples to the top level

Repository: qpid-proton
Updated Branches:
  refs/heads/master 6888ab5e5 -> 564e0ca4c


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
deleted file mode 100644
index 6fd74a5..0000000
--- a/examples/c/proactor/receive.c
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/connection.h>
-#include <proton/condition.h>
-#include <proton/delivery.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/proactor.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-typedef struct app_data_t {
-  const char *host, *port;
-  const char *amqp_address;
-  const char *container_id;
-  int message_count;
-
-  pn_proactor_t *proactor;
-  int received;
-  bool finished;
-} app_data_t;
-
-static const int BATCH = 1000; /* Batch size for unlimited receive */
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-    pn_connection_close(pn_event_connection(e));
-    exit_code = 1;
-  }
-}
-
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
-  static char buffer[MAX_SIZE];
-  ssize_t len;
-  // try to decode the message body
-  if (pn_delivery_pending(dlv) < MAX_SIZE) {
-    // read in the raw data
-    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
-    if (len > 0) {
-      // decode it into a proton message
-      pn_message_t *m = pn_message();
-      if (PN_OK == pn_message_decode(m, buffer, len)) {
-        pn_string_t *s = pn_string(NULL);
-        pn_inspect(pn_message_body(m), s);
-        printf("%s\n", pn_string_get(s));
-        pn_free(s);
-      }
-      pn_message_free(m);
-    }
-  }
-}
-
-/* Return true to continue, false to exit */
-static bool handle(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_CONNECTION_INIT: {
-     pn_connection_t* c = pn_event_connection(event);
-     pn_connection_set_container(c, app->container_id);
-     pn_connection_open(c);
-     pn_session_t* s = pn_session(c);
-     pn_session_open(s);
-     pn_link_t* l = pn_receiver(s, "my_receiver");
-     pn_terminus_set_address(pn_link_source(l), app->amqp_address);
-     pn_link_open(l);
-     /* cannot receive without granting credit: */
-     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
-   } break;
-
-   case PN_DELIVERY: {
-     /* A message has been received */
-     pn_link_t *link = NULL;
-     pn_delivery_t *dlv = pn_event_delivery(event);
-     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
-       link = pn_delivery_link(dlv);
-       decode_message(dlv);
-       /* Accept the delivery */
-       pn_delivery_update(dlv, PN_ACCEPTED);
-       /* done with the delivery, move to the next and free it */
-       pn_link_advance(link);
-       pn_delivery_settle(dlv);  /* dlv is now freed */
-
-       if (app->message_count == 0) {
-         /* receive forever - see if more credit is needed */
-         if (pn_link_credit(link) < BATCH/2) {
-           /* Grant enough credit to bring it up to BATCH: */
-           pn_link_flow(link, BATCH - pn_link_credit(link));
-         }
-       } else if (++app->received >= app->message_count) {
-         /* done receiving, close the endpoints */
-         printf("%d messages received\n", app->received);
-         pn_session_t *ssn = pn_link_session(link);
-         pn_link_close(link);
-         pn_session_close(ssn);
-         pn_connection_close(pn_session_connection(ssn));
-       }
-     }
-   } break;
-
-   case PN_TRANSPORT_CLOSED:
-    check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-   case PN_LINK_REMOTE_DETACH:
-    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_PROACTOR_INACTIVE:
-    return false;
-    break;
-
-   default:
-    break;
-  }
-    return true;
-}
-
-void run(app_data_t *app) {
-  /* Loop and handle events */
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
-    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
-      if (!handle(app, e)) {
-        return;
-      }
-    }
-    pn_proactor_done(app->proactor, events);
-  } while(true);
-}
-
-int main(int argc, char **argv) {
-  struct app_data_t app = {0};
-  int i = 0;
-  app.container_id = argv[i++];   /* Should be unique */
-  app.host = (argc > 1) ? argv[i++] : "";
-  app.port = (argc > 1) ? argv[i++] : "amqp";
-  app.amqp_address = (argc > i) ? argv[i++] : "examples";
-  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
-
-  /* Create the proactor and connect */
-  app.proactor = pn_proactor();
-  char addr[PN_MAX_ADDR];
-  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
-  pn_proactor_connect(app.proactor, pn_connection(), addr);
-  run(&app);
-  pn_proactor_free(app.proactor);
-  return exit_code;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
deleted file mode 100644
index 43da8b0..0000000
--- a/examples/c/proactor/send.c
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/connection.h>
-#include <proton/condition.h>
-#include <proton/delivery.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/proactor.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-typedef struct app_data_t {
-  const char *host, *port;
-  const char *amqp_address;
-  const char *container_id;
-  int message_count;
-
-  pn_proactor_t *proactor;
-  pn_rwbytes_t message_buffer;
-  int sent;
-  int acknowledged;
-} app_data_t;
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-    pn_connection_close(pn_event_connection(e));
-    exit_code = 1;
-  }
-}
-
-/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
-static pn_bytes_t encode_message(app_data_t* app) {
-  /* Construct a message with the map { "sequence": app.sent } */
-  pn_message_t* message = pn_message();
-  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
-  pn_data_t* body = pn_message_body(message);
-  pn_data_put_map(body);
-  pn_data_enter(body);
-  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
-  pn_data_put_int(body, app->sent); /* The sequence number */
-  pn_data_exit(body);
-
-  /* encode the message, expanding the encode buffer as needed */
-  if (app->message_buffer.start == NULL) {
-    static const size_t initial_size = 128;
-    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
-  }
-  /* app->message_buffer is the total buffer space available. */
-  /* mbuf wil point at just the portion used by the encoded message */
-  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
-  int status = 0;
-  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
-    app->message_buffer.size *= 2;
-    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
-    mbuf.size = app->message_buffer.size;
-  }
-  if (status != 0) {
-    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
-    exit(1);
-  }
-  pn_message_free(message);
-  return pn_bytes(mbuf.size, mbuf.start);
-}
-
-/* Returns true to continue, false if finished */
-static bool handle(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_CONNECTION_INIT: {
-     pn_connection_t* c = pn_event_connection(event);
-     pn_connection_set_container(c, app->container_id);
-     pn_connection_open(c);
-     pn_session_t* s = pn_session(pn_event_connection(event));
-     pn_session_open(s);
-     pn_link_t* l = pn_sender(s, "my_sender");
-     pn_terminus_set_address(pn_link_target(l), app->amqp_address);
-     pn_link_open(l);
-     break;
-   }
-
-   case PN_LINK_FLOW: {
-     /* The peer has given us some credit, now we can send messages */
-     pn_link_t *sender = pn_event_link(event);
-     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
-       ++app->sent;
-       // Use sent counter as unique delivery tag.
-       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
-       pn_bytes_t msgbuf = encode_message(app);
-       pn_link_send(sender, msgbuf.start, msgbuf.size);
-       pn_link_advance(sender);
-     }
-     break;
-   }
-
-   case PN_DELIVERY: {
-     /* We received acknowledgedment from the peer that a message was delivered. */
-     pn_delivery_t* d = pn_event_delivery(event);
-     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
-       if (++app->acknowledged == app->message_count) {
-         printf("%d messages sent and acknowledged\n", app->acknowledged);
-         pn_connection_close(pn_event_connection(event));
-         /* Continue handling events till we receive TRANSPORT_CLOSED */
-       }
-     } else {
-       fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d));
-       pn_connection_close(pn_event_connection(event));
-       exit_code=1;
-     }
-     break;
-   }
-
-   case PN_TRANSPORT_CLOSED:
-    check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-   case PN_LINK_REMOTE_DETACH:
-    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_PROACTOR_INACTIVE:
-    return false;
-
-   default: break;
-  }
-  return true;
-}
-
-void run(app_data_t *app) {
-  /* Loop and handle events */
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
-    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
-      if (!handle(app, e)) {
-        return;
-      }
-    }
-    pn_proactor_done(app->proactor, events);
-  } while(true);
-}
-
-int main(int argc, char **argv) {
-  struct app_data_t app = {0};
-  int i = 0;
-  app.container_id = argv[i++];   /* Should be unique */
-  app.host = (argc > 1) ? argv[i++] : "";
-  app.port = (argc > 1) ? argv[i++] : "amqp";
-  app.amqp_address = (argc > i) ? argv[i++] : "examples";
-  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
-
-  app.proactor = pn_proactor();
-  char addr[PN_MAX_ADDR];
-  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
-  pn_proactor_connect(app.proactor, pn_connection(), addr);
-  run(&app);
-  pn_proactor_free(app.proactor);
-  free(app.message_buffer.start);
-  return exit_code;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/thread.h
----------------------------------------------------------------------
diff --git a/examples/c/proactor/thread.h b/examples/c/proactor/thread.h
deleted file mode 100644
index 3b9f19e..0000000
--- a/examples/c/proactor/thread.h
+++ /dev/null
@@ -1,49 +0,0 @@
-#ifndef _PROTON_EXAMPLES_C_PROACTOR_THREADS_H
-#define _PROTON_EXAMPLES_C_PROACTOR_THREADS_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */
-
-#ifdef _WIN32
-
-#include <windows.h>
-#include <time.h>
-#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */
-#include <process.h>
-#include <windows.h>
-
-#define pthread_function DWORD WINAPI
-#define pthread_function_return DWORD
-#define pthread_t HANDLE
-#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL)
-#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread))
-#define pthread_mutex_T HANDLE
-#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL))
-#define pthread_mutex_destroy(pobject) CloseHandle(*pobject)
-#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE)
-#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject)
-
-#else
-
-#include <pthread.h>
-
-#endif
-
-#endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/reactor/CMakeLists.txt b/examples/c/reactor/CMakeLists.txt
deleted file mode 100644
index bd6163f..0000000
--- a/examples/c/reactor/CMakeLists.txt
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED)
-
-set (reactor-examples
-  sender.c
-  receiver.c
-  )
-
-set_source_files_properties (
-  ${reactor-examples}
-  PROPERTIES
-  COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LINK_TIME_OPTIMIZATION}"
-  )
-
-if (BUILD_WITH_CXX)
-  set_source_files_properties (
-    ${reactor-examples}
-    PROPERTIES LANGUAGE CXX
-    )
-endif (BUILD_WITH_CXX)
-
-include_directories(${Proton_INCLUDE_DIRS})
-add_executable(sender sender.c)
-add_executable(receiver receiver.c)
-target_link_libraries(sender ${Proton_LIBRARIES})
-target_link_libraries(receiver ${Proton_LIBRARIES})
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/README
----------------------------------------------------------------------
diff --git a/examples/c/reactor/README b/examples/c/reactor/README
deleted file mode 100644
index 8d61893..0000000
--- a/examples/c/reactor/README
+++ /dev/null
@@ -1,30 +0,0 @@
-These example clients require a broker or similar intermediary that
-supports the AMQP 1.0 protocol, allows anonymous connections and
-accepts links to and from a node named 'examples'.
-
-------------------------------------------------------------------
-
-sender.c
-
-A simple message sending client.  This example sends all messages but
-the last as pre-settled (no ack required).  It then pends waiting for
-an ack for the last message sent before exiting.
-
-Use the '-h' command line option for a list of supported parameters.
-
-------------------------------------------------------------------
-
-receiver.c
-
-A simple message consuming client.  This example receives messages
-from a target (default 'examples').  Received messages are
-acknowledged if they are sent un-settled.  The client will try to
-decode the message payload assuming it has been generated by the
-sender example.
-
-Use the '-h' command line option for a list of supported parameters.
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/receiver.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c
deleted file mode 100644
index e72a6d9..0000000
--- a/examples/c/reactor/receiver.c
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "pncompat/misc_funcs.inc"
-
-#include "proton/reactor.h"
-#include "proton/message.h"
-#include "proton/connection.h"
-#include "proton/session.h"
-#include "proton/link.h"
-#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
-#include "proton/transport.h"
-#include "proton/url.h"
-
-static int quiet = 0;
-
-// Credit batch if unlimited receive (-c 0)
-static const int CAPACITY = 100;
-#define MAX_SIZE 512
-
-// Example application data.  This data will be instantiated in the event
-// handler, and is available during event processing.  In this example it
-// holds configuration and state information.
-//
-typedef struct {
-    int count;          // # of messages to receive before exiting
-    const char *source;     // name of the source node to receive from
-    pn_message_t *message;  // holds the received message
-} app_data_t;
-
-// helper to pull pointer to app_data_t instance out of the pn_handler_t
-//
-#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
-
-// Called when reactor exits to clean up app_data
-//
-static void delete_handler(pn_handler_t *handler)
-{
-    app_data_t *d = GET_APP_DATA(handler);
-    if (d->message) {
-        pn_decref(d->message);
-        d->message = NULL;
-    }
-}
-
-
-/* Process each event posted by the reactor.
- */
-static void event_handler(pn_handler_t *handler,
-                          pn_event_t *event,
-                          pn_event_type_t type)
-{
-    app_data_t *data = GET_APP_DATA(handler);
-
-    switch (type) {
-
-    case PN_CONNECTION_INIT: {
-        // Create and open all the endpoints needed to send a message
-        //
-        pn_connection_t *conn;
-        pn_session_t *ssn;
-        pn_link_t *receiver;
-
-        conn = pn_event_connection(event);
-        pn_connection_open(conn);
-        ssn = pn_session(conn);
-        pn_session_open(ssn);
-        receiver = pn_receiver(ssn, "MyReceiver");
-        pn_terminus_set_address(pn_link_source(receiver), data->source);
-        pn_link_open(receiver);
-        // cannot receive without granting credit:
-        pn_link_flow(receiver, data->count ? data->count : CAPACITY);
-    } break;
-
-    case PN_DELIVERY: {
-        // A message has been received
-        //
-        pn_link_t *link = NULL;
-        pn_delivery_t *dlv = pn_event_delivery(event);
-        if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
-            // A full message has arrived
-            if (!quiet) {
-                ssize_t len;
-                pn_bytes_t bytes;
-                bool found = false;
-
-                // try to decode the message body
-                if (pn_delivery_pending(dlv) < MAX_SIZE) {
-                    static char buffer[MAX_SIZE];
-                    // read in the raw data
-                    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
-                    if (len > 0) {
-                        // decode it into a proton message
-                        pn_message_clear(data->message);
-                        if (PN_OK == pn_message_decode(data->message, buffer,
-                                                       len)) {
-                            // Assuming the message came from the sender
-                            // example, try to parse out a single string from
-                            // the payload
-                            //
-                            pn_data_scan(pn_message_body(data->message), "?S",
-                                         &found, &bytes);
-                        }
-                    }
-                }
-                if (found) {
-                    fprintf(stdout, "Message: [%.*s]\n", (int)bytes.size,
-                            bytes.start);
-                } else {
-                    fprintf(stdout, "Message received!\n");
-                }
-            }
-
-            link = pn_delivery_link(dlv);
-
-            if (!pn_delivery_settled(dlv)) {
-                // remote has not settled, so it is tracking the delivery.  Ack
-                // it.
-                pn_delivery_update(dlv, PN_ACCEPTED);
-            }
-
-            // done with the delivery, move to the next and free it
-            pn_link_advance(link);
-            pn_delivery_settle(dlv);  // dlv is now freed
-
-            if (data->count == 0) {
-                // send forever - see if more credit is needed
-                if (pn_link_credit(link) < CAPACITY/2) {
-                    // Grant enough credit to bring it up to CAPACITY:
-                    pn_link_flow(link, CAPACITY - pn_link_credit(link));
-                }
-            } else if (--data->count == 0) {
-                // done receiving, close the endpoints
-                pn_session_t *ssn = pn_link_session(link);
-				pn_link_close(link);
-                pn_session_close(ssn);
-                pn_connection_close(pn_session_connection(ssn));
-            }
-        }
-    } break;
-
-    case PN_TRANSPORT_ERROR: {
-        // The connection to the peer failed.
-        //
-        pn_transport_t *tport = pn_event_transport(event);
-        pn_condition_t *cond = pn_transport_condition(tport);
-        fprintf(stderr, "Network transport failed!\n");
-        if (pn_condition_is_set(cond)) {
-            const char *name = pn_condition_get_name(cond);
-            const char *desc = pn_condition_get_description(cond);
-            fprintf(stderr, "    Error: %s  Description: %s\n",
-                    (name) ? name : "<error name not provided>",
-                    (desc) ? desc : "<no description provided>");
-        }
-        // pn_reactor_process() will exit with a false return value, stopping
-        // the main loop.
-    } break;
-
-    default:
-        break;
-    }
-}
-
-static void usage(void)
-{
-  printf("Usage: receiver <options>\n");
-  printf("-a      \tThe host address [localhost:5672]\n");
-  printf("-c      \t# of messages to receive, 0=receive forever [1]\n");
-  printf("-s      \tSource address [examples]\n");
-  printf("-i      \tContainer name [ReceiveExample]\n");
-  printf("-q      \tQuiet - turn off stdout\n");
-  exit(1);
-}
-
-int main(int argc, char** argv)
-{
-    const char *address = "localhost";
-    const char *container = "ReceiveExample";
-    int c;
-    pn_reactor_t *reactor = NULL;
-    pn_url_t *url = NULL;
-    pn_connection_t *conn = NULL;
-
-    /* create a handler for the connection's events.
-     * event_handler will be called for each event.  The handler will allocate
-     * a app_data_t instance which can be accessed when the event_handler is
-     * called.
-     */
-    pn_handler_t *handler = pn_handler_new(event_handler,
-                                           sizeof(app_data_t),
-                                           delete_handler);
-
-    /* set up the application data with defaults */
-    app_data_t *app_data = GET_APP_DATA(handler);
-    memset(app_data, 0, sizeof(app_data_t));
-    app_data->count = 1;
-    app_data->source = "examples";
-    app_data->message = pn_message();
-
-    /* Attach the pn_handshaker() handler.  This handler deals with endpoint
-     * events from the peer so we don't have to.
-     */
-    {
-        pn_handler_t *handshaker = pn_handshaker();
-        pn_handler_add(handler, handshaker);
-        pn_decref(handshaker);
-    }
-
-    /* command line options */
-    opterr = 0;
-    while((c = getopt(argc, argv, "i:a:c:s:qh")) != -1) {
-        switch(c) {
-        case 'h': usage(); break;
-        case 'a': address = optarg; break;
-        case 'c':
-            app_data->count = atoi(optarg);
-            if (app_data->count < 0) usage();
-            break;
-        case 's': app_data->source = optarg; break;
-        case 'i': container = optarg; break;
-        case 'q': quiet = 1; break;
-        default:
-            usage();
-            break;
-        }
-    }
-
-    reactor = pn_reactor();
-
-    url = pn_url_parse(address);
-    if (url == NULL) {
-        fprintf(stderr, "Invalid host address %s\n", address);
-        exit(1);
-    }
-    conn = pn_reactor_connection_to_host(reactor,
-                                         pn_url_get_host(url),
-                                         pn_url_get_port(url),
-                                         handler);
-    pn_decref(url);
-    pn_decref(handler);
-
-    // the container name should be unique for each client
-    pn_connection_set_container(conn, container);
-
-    // wait up to 5 seconds for activity before returning from
-    // pn_reactor_process()
-    pn_reactor_set_timeout(reactor, 5000);
-
-    pn_reactor_start(reactor);
-
-    while (pn_reactor_process(reactor)) {
-        /* Returns 'true' until the connection is shut down.
-         * pn_reactor_process() will return true at least once every 5 seconds
-         * (due to the timeout).  If no timeout was configured,
-         * pn_reactor_process() returns as soon as it finishes processing all
-         * pending I/O and events. Once the connection has closed,
-         * pn_reactor_process() will return false.
-         */
-    }
-    pn_decref(reactor);
-
-    return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/sender.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/sender.c b/examples/c/reactor/sender.c
deleted file mode 100644
index 6c3cdb3..0000000
--- a/examples/c/reactor/sender.c
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "pncompat/misc_funcs.inc"
-
-#include "proton/reactor.h"
-#include "proton/message.h"
-#include "proton/connection.h"
-#include "proton/session.h"
-#include "proton/link.h"
-#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
-#include "proton/transport.h"
-#include "proton/url.h"
-
-
-static int quiet = 0;
-
-// Example application data.  This data will be instantiated in the event
-// handler, and is available during event processing.  In this example it
-// holds configuration and state information.
-//
-typedef struct {
-    int count;           // # messages to send
-    int anon;            // use anonymous link if true
-    const char *target;  // name of destination target
-    char *msg_data;      // pre-encoded outbound message
-    size_t msg_len;      // bytes in msg_data
-} app_data_t;
-
-// helper to pull pointer to app_data_t instance out of the pn_handler_t
-//
-#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
-
-// Called when reactor exits to clean up app_data
-//
-static void delete_handler(pn_handler_t *handler)
-{
-    app_data_t *d = GET_APP_DATA(handler);
-    if (d->msg_data) {
-        free(d->msg_data);
-        d->msg_data = NULL;
-    }
-}
-
-/* Process each event posted by the reactor.
- */
-static void event_handler(pn_handler_t *handler,
-                          pn_event_t *event,
-                          pn_event_type_t type)
-{
-    app_data_t *data = GET_APP_DATA(handler);
-
-    switch (type) {
-
-    case PN_CONNECTION_INIT: {
-        // Create and open all the endpoints needed to send a message
-        //
-        pn_connection_t *conn;
-        pn_session_t *ssn;
-        pn_link_t *sender;
-
-        conn = pn_event_connection(event);
-        pn_connection_open(conn);
-        ssn = pn_session(conn);
-        pn_session_open(ssn);
-        sender = pn_sender(ssn, "MySender");
-        // we do not wait for ack until the last message
-        pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
-        if (!data->anon) {
-            pn_terminus_set_address(pn_link_target(sender), data->target);
-        }
-        pn_link_open(sender);
-    } break;
-
-    case PN_LINK_FLOW: {
-        // the remote has given us some credit, now we can send messages
-        //
-        static long tag = 0;  // a simple tag generator
-        pn_delivery_t *delivery;
-        pn_link_t *sender = pn_event_link(event);
-        int credit = pn_link_credit(sender);
-        while (credit > 0 && data->count > 0) {
-            --credit;
-            --data->count;
-            ++tag;
-            delivery = pn_delivery(sender,
-                                   pn_dtag((const char *)&tag, sizeof(tag)));
-            pn_link_send(sender, data->msg_data, data->msg_len);
-            pn_link_advance(sender);
-            if (data->count > 0) {
-                // send pre-settled until the last one, then wait for an ack on
-                // the last sent message. This allows the sender to send
-                // messages as fast as possible and then exit when the consumer
-                // has dealt with the last one.
-                //
-                pn_delivery_settle(delivery);
-            }
-        }
-    } break;
-
-    case PN_DELIVERY: {
-        // Since the example sends all messages but the last pre-settled
-        // (pre-acked), only the last message's delivery will get updated with
-        // the remote state (acked/nacked).
-        //
-        pn_delivery_t *dlv = pn_event_delivery(event);
-        if (pn_delivery_updated(dlv) && pn_delivery_remote_state(dlv)) {
-            uint64_t rs = pn_delivery_remote_state(dlv);
-            int done = 1;
-            switch (rs) {
-            case PN_RECEIVED:
-                // This is not a terminal state - it is informational, and the
-                // peer is still processing the message.
-                done = 0;
-                break;
-            case PN_ACCEPTED:
-                pn_delivery_settle(dlv);
-                if (!quiet) fprintf(stdout, "Send complete!\n");
-                break;
-            case PN_REJECTED:
-            case PN_RELEASED:
-            case PN_MODIFIED:
-                pn_delivery_settle(dlv);
-                fprintf(stderr, "Message not accepted - code:%lu\n", (unsigned long)rs);
-                break;
-            default:
-                // ??? no other terminal states defined, so ignore anything else
-                pn_delivery_settle(dlv);
-                fprintf(stderr, "Unknown delivery failure - code=%lu\n", (unsigned long)rs);
-                break;
-            }
-
-            if (done) {
-                // initiate clean shutdown of the endpoints
-                pn_link_t *link = pn_delivery_link(dlv);
-                pn_session_t *ssn = pn_link_session(link);
-                pn_link_close(link);
-                pn_session_close(ssn);
-                pn_connection_close(pn_session_connection(ssn));
-            }
-        }
-    } break;
-
-    case PN_TRANSPORT_ERROR: {
-        // The connection to the peer failed.
-        //
-        pn_transport_t *tport = pn_event_transport(event);
-        pn_condition_t *cond = pn_transport_condition(tport);
-        fprintf(stderr, "Network transport failed!\n");
-        if (pn_condition_is_set(cond)) {
-            const char *name = pn_condition_get_name(cond);
-            const char *desc = pn_condition_get_description(cond);
-            fprintf(stderr, "    Error: %s  Description: %s\n",
-                    (name) ? name : "<error name not provided>",
-                    (desc) ? desc : "<no description provided>");
-        }
-        // pn_reactor_process() will exit with a false return value, stopping
-        // the main loop.
-    } break;
-
-    default:
-        break;
-    }
-}
-
-static void usage(void)
-{
-  printf("Usage: send <options> <message>\n");
-  printf("-a      \tThe host address [localhost:5672]\n");
-  printf("-c      \t# of messages to send [1]\n");
-  printf("-t      \tTarget address [examples]\n");
-  printf("-n      \tUse an anonymous link [off]\n");
-  printf("-i      \tContainer name [SendExample]\n");
-  printf("-q      \tQuiet - turn off stdout\n");
-  printf("message \tA text string to send.\n");
-  exit(1);
-}
-
-int main(int argc, char** argv)
-{
-    const char *address = "localhost";
-    const char *msgtext = "Hello World!";
-    const char *container = "SendExample";
-    int c;
-    pn_message_t *message = NULL;
-    pn_data_t *body = NULL;
-    pn_reactor_t *reactor = NULL;
-    pn_url_t *url = NULL;
-    pn_connection_t *conn = NULL;
-
-    /* Create a handler for the connection's events.  event_handler() will be
-     * called for each event and delete_handler will be called when the
-     * connection is released.  The handler will allocate an app_data_t
-     * instance which can be accessed when the event_handler is called.
-     */
-    pn_handler_t *handler = pn_handler_new(event_handler,
-                                           sizeof(app_data_t),
-                                           delete_handler);
-
-    /* set up the application data with defaults */
-    app_data_t *app_data = GET_APP_DATA(handler);
-    memset(app_data, 0, sizeof(app_data_t));
-    app_data->count = 1;
-    app_data->target = "examples";
-
-    /* Attach the pn_handshaker() handler.  This handler deals with endpoint
-     * events from the peer so we don't have to.
-     */
-    {
-        pn_handler_t *handshaker = pn_handshaker();
-        pn_handler_add(handler, handshaker);
-        pn_decref(handshaker);
-    }
-
-    /* command line options */
-    opterr = 0;
-    while((c = getopt(argc, argv, "i:a:c:t:nhq")) != -1) {
-        switch(c) {
-        case 'h': usage(); break;
-        case 'a': address = optarg; break;
-        case 'c':
-            app_data->count = atoi(optarg);
-            if (app_data->count < 1) usage();
-            break;
-        case 't': app_data->target = optarg; break;
-        case 'n': app_data->anon = 1; break;
-        case 'i': container = optarg; break;
-        case 'q': quiet = 1; break;
-        default:
-            usage();
-            break;
-        }
-    }
-    if (optind < argc) msgtext = argv[optind];
-
-
-    // create a single message and pre-encode it so we only have to do that
-    // once.  All transmits will use the same pre-encoded message simply for
-    // speed.
-    //
-    message = pn_message();
-    pn_message_set_address(message, app_data->target);
-    body = pn_message_body(message);
-    pn_data_clear(body);
-
-    // This message's body contains a single string
-    if (pn_data_fill(body, "S", msgtext)) {
-        fprintf(stderr, "Error building message!\n");
-        exit(1);
-    }
-    pn_data_rewind(body);
-    {
-        // encode the message, expanding the encode buffer as needed
-        //
-        size_t len = 128;
-        char *buf = (char *)malloc(len);
-        int rc = 0;
-        do {
-            rc = pn_message_encode(message, buf, &len);
-            if (rc == PN_OVERFLOW) {
-                free(buf);
-                len *= 2;
-                buf = (char *)malloc(len);
-            }
-        } while (rc == PN_OVERFLOW);
-        app_data->msg_len = len;
-        app_data->msg_data = buf;
-    }
-    pn_decref(message);   // message no longer needed
-
-    reactor = pn_reactor();
-
-    url = pn_url_parse(address);
-    if (url == NULL) {
-        fprintf(stderr, "Invalid host address %s\n", address);
-        exit(1);
-    }
-    conn = pn_reactor_connection_to_host(reactor,
-                                         pn_url_get_host(url),
-                                         pn_url_get_port(url),
-                                         handler);
-    pn_decref(url);
-    pn_decref(handler);
-
-    // the container name should be unique for each client
-    pn_connection_set_container(conn, container);
-
-    // wait up to 5 seconds for activity before returning from
-    // pn_reactor_process()
-    pn_reactor_set_timeout(reactor, 5000);
-
-    pn_reactor_start(reactor);
-
-    while (pn_reactor_process(reactor)) {
-        /* Returns 'true' until the connection is shut down.
-         * pn_reactor_process() will return true at least once every 5 seconds
-         * (due to the timeout).  If no timeout was configured,
-         * pn_reactor_process() returns as soon as it finishes processing all
-         * pending I/O and events. Once the connection has closed,
-         * pn_reactor_process() will return false.
-         */
-    }
-    pn_decref(reactor);
-
-    return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/receive.c b/examples/c/receive.c
new file mode 100644
index 0000000..6fd74a5
--- /dev/null
+++ b/examples/c/receive.c
@@ -0,0 +1,188 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/condition.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+typedef struct app_data_t {
+  const char *host, *port;
+  const char *amqp_address;
+  const char *container_id;
+  int message_count;
+
+  pn_proactor_t *proactor;
+  int received;
+  bool finished;
+} app_data_t;
+
+static const int BATCH = 1000; /* Batch size for unlimited receive */
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    exit_code = 1;
+  }
+}
+
+#define MAX_SIZE 1024
+
+static void decode_message(pn_delivery_t *dlv) {
+  static char buffer[MAX_SIZE];
+  ssize_t len;
+  // try to decode the message body
+  if (pn_delivery_pending(dlv) < MAX_SIZE) {
+    // read in the raw data
+    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
+    if (len > 0) {
+      // decode it into a proton message
+      pn_message_t *m = pn_message();
+      if (PN_OK == pn_message_decode(m, buffer, len)) {
+        pn_string_t *s = pn_string(NULL);
+        pn_inspect(pn_message_body(m), s);
+        printf("%s\n", pn_string_get(s));
+        pn_free(s);
+      }
+      pn_message_free(m);
+    }
+  }
+}
+
+/* Return true to continue, false to exit */
+static bool handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_CONNECTION_INIT: {
+     pn_connection_t* c = pn_event_connection(event);
+     pn_connection_set_container(c, app->container_id);
+     pn_connection_open(c);
+     pn_session_t* s = pn_session(c);
+     pn_session_open(s);
+     pn_link_t* l = pn_receiver(s, "my_receiver");
+     pn_terminus_set_address(pn_link_source(l), app->amqp_address);
+     pn_link_open(l);
+     /* cannot receive without granting credit: */
+     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+   } break;
+
+   case PN_DELIVERY: {
+     /* A message has been received */
+     pn_link_t *link = NULL;
+     pn_delivery_t *dlv = pn_event_delivery(event);
+     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+       link = pn_delivery_link(dlv);
+       decode_message(dlv);
+       /* Accept the delivery */
+       pn_delivery_update(dlv, PN_ACCEPTED);
+       /* done with the delivery, move to the next and free it */
+       pn_link_advance(link);
+       pn_delivery_settle(dlv);  /* dlv is now freed */
+
+       if (app->message_count == 0) {
+         /* receive forever - see if more credit is needed */
+         if (pn_link_credit(link) < BATCH/2) {
+           /* Grant enough credit to bring it up to BATCH: */
+           pn_link_flow(link, BATCH - pn_link_credit(link));
+         }
+       } else if (++app->received >= app->message_count) {
+         /* done receiving, close the endpoints */
+         printf("%d messages received\n", app->received);
+         pn_session_t *ssn = pn_link_session(link);
+         pn_link_close(link);
+         pn_session_close(ssn);
+         pn_connection_close(pn_session_connection(ssn));
+       }
+     }
+   } break;
+
+   case PN_TRANSPORT_CLOSED:
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+   case PN_LINK_REMOTE_DETACH:
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_PROACTOR_INACTIVE:
+    return false;
+    break;
+
+   default:
+    break;
+  }
+    return true;
+}
+
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
+}
+
+int main(int argc, char **argv) {
+  struct app_data_t app = {0};
+  int i = 0;
+  app.container_id = argv[i++];   /* Should be unique */
+  app.host = (argc > 1) ? argv[i++] : "";
+  app.port = (argc > 1) ? argv[i++] : "amqp";
+  app.amqp_address = (argc > i) ? argv[i++] : "examples";
+  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  char addr[PN_MAX_ADDR];
+  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+  pn_proactor_connect(app.proactor, pn_connection(), addr);
+  run(&app);
+  pn_proactor_free(app.proactor);
+  return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/send.c
----------------------------------------------------------------------
diff --git a/examples/c/send.c b/examples/c/send.c
new file mode 100644
index 0000000..43da8b0
--- /dev/null
+++ b/examples/c/send.c
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/condition.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+typedef struct app_data_t {
+  const char *host, *port;
+  const char *amqp_address;
+  const char *container_id;
+  int message_count;
+
+  pn_proactor_t *proactor;
+  pn_rwbytes_t message_buffer;
+  int sent;
+  int acknowledged;
+} app_data_t;
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    exit_code = 1;
+  }
+}
+
+/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
+static pn_bytes_t encode_message(app_data_t* app) {
+  /* Construct a message with the map { "sequence": app.sent } */
+  pn_message_t* message = pn_message();
+  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+  pn_data_t* body = pn_message_body(message);
+  pn_data_put_map(body);
+  pn_data_enter(body);
+  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+  pn_data_put_int(body, app->sent); /* The sequence number */
+  pn_data_exit(body);
+
+  /* encode the message, expanding the encode buffer as needed */
+  if (app->message_buffer.start == NULL) {
+    static const size_t initial_size = 128;
+    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+  }
+  /* app->message_buffer is the total buffer space available. */
+  /* mbuf wil point at just the portion used by the encoded message */
+  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+  int status = 0;
+  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+    app->message_buffer.size *= 2;
+    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+    mbuf.size = app->message_buffer.size;
+  }
+  if (status != 0) {
+    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+    exit(1);
+  }
+  pn_message_free(message);
+  return pn_bytes(mbuf.size, mbuf.start);
+}
+
+/* Returns true to continue, false if finished */
+static bool handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_CONNECTION_INIT: {
+     pn_connection_t* c = pn_event_connection(event);
+     pn_connection_set_container(c, app->container_id);
+     pn_connection_open(c);
+     pn_session_t* s = pn_session(pn_event_connection(event));
+     pn_session_open(s);
+     pn_link_t* l = pn_sender(s, "my_sender");
+     pn_terminus_set_address(pn_link_target(l), app->amqp_address);
+     pn_link_open(l);
+     break;
+   }
+
+   case PN_LINK_FLOW: {
+     /* The peer has given us some credit, now we can send messages */
+     pn_link_t *sender = pn_event_link(event);
+     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+       ++app->sent;
+       // Use sent counter as unique delivery tag.
+       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+       pn_bytes_t msgbuf = encode_message(app);
+       pn_link_send(sender, msgbuf.start, msgbuf.size);
+       pn_link_advance(sender);
+     }
+     break;
+   }
+
+   case PN_DELIVERY: {
+     /* We received acknowledgedment from the peer that a message was delivered. */
+     pn_delivery_t* d = pn_event_delivery(event);
+     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+       if (++app->acknowledged == app->message_count) {
+         printf("%d messages sent and acknowledged\n", app->acknowledged);
+         pn_connection_close(pn_event_connection(event));
+         /* Continue handling events till we receive TRANSPORT_CLOSED */
+       }
+     } else {
+       fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d));
+       pn_connection_close(pn_event_connection(event));
+       exit_code=1;
+     }
+     break;
+   }
+
+   case PN_TRANSPORT_CLOSED:
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+   case PN_LINK_REMOTE_DETACH:
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_PROACTOR_INACTIVE:
+    return false;
+
+   default: break;
+  }
+  return true;
+}
+
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
+}
+
+int main(int argc, char **argv) {
+  struct app_data_t app = {0};
+  int i = 0;
+  app.container_id = argv[i++];   /* Should be unique */
+  app.host = (argc > 1) ? argv[i++] : "";
+  app.port = (argc > 1) ? argv[i++] : "amqp";
+  app.amqp_address = (argc > i) ? argv[i++] : "examples";
+  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+
+  app.proactor = pn_proactor();
+  char addr[PN_MAX_ADDR];
+  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+  pn_proactor_connect(app.proactor, pn_connection(), addr);
+  run(&app);
+  pn_proactor_free(app.proactor);
+  free(app.message_buffer.start);
+  return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/thread.h
----------------------------------------------------------------------
diff --git a/examples/c/thread.h b/examples/c/thread.h
new file mode 100644
index 0000000..1bd5595
--- /dev/null
+++ b/examples/c/thread.h
@@ -0,0 +1,49 @@
+#ifndef _PROTON_EXAMPLES_C_THREADS_H
+#define _PROTON_EXAMPLES_C_THREADS_H 1
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */
+
+#ifdef _WIN32
+
+#include <windows.h>
+#include <time.h>
+#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */
+#include <process.h>
+#include <windows.h>
+
+#define pthread_function DWORD WINAPI
+#define pthread_function_return DWORD
+#define pthread_t HANDLE
+#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL)
+#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread))
+#define pthread_mutex_T HANDLE
+#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL))
+#define pthread_mutex_destroy(pobject) CloseHandle(*pobject)
+#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE)
+#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject)
+
+#else
+
+#include <pthread.h>
+
+#endif
+
+#endif /* thread.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index 50ea677..c562025 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -63,9 +63,9 @@ if(HAS_PROACTOR)
   endif(WIN32)
 
   if(WIN32)
-    # set(path "$<TARGET_FILE_DIR:proactor-broker>;$<TARGET_FILE_DIR:qpid-proton>")
+    # set(path "$<TARGET_FILE_DIR:c-broker>;$<TARGET_FILE_DIR:qpid-proton>")
   else(WIN32)
-    set(path "${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/examples/c/proactor:$ENV{PATH}")
+    set(path "${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/examples/c:$ENV{PATH}")
   endif(WIN32)
   # Add the tools directory for the 'proctest' module
   set_search_path(pypath "${CMAKE_SOURCE_DIR}/tools/py" "$ENV{PYTHON_PATH}")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-proton git commit: PROTON-1585: Remove old reactor and messenger examples; promote the proactor examples to the top level

Posted by jr...@apache.org.
PROTON-1585: Remove old reactor and messenger examples; promote the proactor examples to the top level


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/564e0ca4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/564e0ca4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/564e0ca4

Branch: refs/heads/master
Commit: 564e0ca4c3367f66824e5ecf417afe70c41fb2d9
Parents: 6888ab5
Author: Justin Ross <jr...@apache.org>
Authored: Mon Sep 11 15:31:15 2017 -0700
Committer: Justin Ross <jr...@apache.org>
Committed: Mon Sep 11 15:44:30 2017 -0700

----------------------------------------------------------------------
 examples/c/CMakeLists.txt           |  35 ++-
 examples/c/README.dox               |  21 ++
 examples/c/broker.c                 | 424 +++++++++++++++++++++++++++++++
 examples/c/direct.c                 | 326 ++++++++++++++++++++++++
 examples/c/example_test.py          |  88 +++++++
 examples/c/messenger/CMakeLists.txt |  52 ----
 examples/c/messenger/recv-async.c   | 193 --------------
 examples/c/messenger/recv.c         | 154 -----------
 examples/c/messenger/send-async.c   | 170 -------------
 examples/c/messenger/send.c         | 111 --------
 examples/c/proactor/CMakeLists.txt  |  44 ----
 examples/c/proactor/README.dox      |  21 --
 examples/c/proactor/broker.c        | 424 -------------------------------
 examples/c/proactor/direct.c        | 326 ------------------------
 examples/c/proactor/example_test.py |  88 -------
 examples/c/proactor/receive.c       | 188 --------------
 examples/c/proactor/send.c          | 196 --------------
 examples/c/proactor/thread.h        |  49 ----
 examples/c/reactor/CMakeLists.txt   |  45 ----
 examples/c/reactor/README           |  30 ---
 examples/c/reactor/receiver.c       | 286 ---------------------
 examples/c/reactor/sender.c         | 329 ------------------------
 examples/c/receive.c                | 188 ++++++++++++++
 examples/c/send.c                   | 196 ++++++++++++++
 examples/c/thread.h                 |  49 ++++
 proton-c/src/tests/CMakeLists.txt   |   4 +-
 26 files changed, 1320 insertions(+), 2717 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt
index 12867be..c300b00 100644
--- a/examples/c/CMakeLists.txt
+++ b/examples/c/CMakeLists.txt
@@ -17,16 +17,33 @@
 # under the License.
 #
 
-find_package(Proton REQUIRED)
+find_package(Proton REQUIRED Core Proactor)
+set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
+find_package(Threads REQUIRED)
+
 include(CheckCCompilerFlag)
 include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
+include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
+
+add_definitions(${COMPILE_LANGUAGE_FLAGS} ${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
+
+# Add a test with the correct environment to find test executables and valgrind.
+if(WIN32)
+  set(test_path "$<TARGET_FILE_DIR:broker>;$<TARGET_FILE_DIR:qpid-proton>")
+else()
+  set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
+endif()
 
-if(Proton_Proactor_FOUND)
-  if(WIN32)
-    message(STATUS "Windows IOCP proactor examples temporarily disabled for build")
-  else(WIN32)
-    add_subdirectory(proactor)
-  endif(WIN32)
+if(WIN32)
+  message(STATUS "Windows IOCP proactor examples temporarily disabled for build")
+else()
+  foreach (name broker send receive direct)
+    add_executable(c-${name} ${name}.c)
+    target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
+    set_target_properties(c-${name} PROPERTIES OUTPUT_NAME ${name})
+  endforeach()
 endif()
-add_subdirectory(messenger)
-add_subdirectory(reactor)
+
+set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
+
+add_test(c-example-tests ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/README.dox b/examples/c/README.dox
new file mode 100644
index 0000000..a548d35
--- /dev/null
+++ b/examples/c/README.dox
@@ -0,0 +1,21 @@
+/**
+ * @example send.c
+ *
+ * Send a fixed number of messages to the "examples" node.
+ * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
+ *
+ * @example receive.c
+ *
+ * Subscribes to the 'example' node and prints the message bodies received.
+ * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
+ *
+ * @example direct.c
+ *
+ * A server that can be used to demonstrate direct (no broker) peer-to-peer communication
+ * It can accept an incoming connection from either the @ref send.c or @ref receive.c examples
+ * and will act as the directly-connected counterpart (receive or send)
+ *
+ * @example broker.c
+ *
+ * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples.
+ */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
new file mode 100644
index 0000000..e0d9672
--- /dev/null
+++ b/examples/c/broker.c
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "thread.h"
+
+#include <proton/engine.h>
+#include <proton/listener.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+/* Simple re-sizable vector that acts as a queue */
+#define VEC(T) struct { T* data; size_t len, cap; }
+
+#define VEC_INIT(V)                             \
+  do {                                          \
+    V.len = 0;                                  \
+    V.cap = 16;                                 \
+    void **vp = (void**)&V.data;                \
+    *vp = malloc(V.cap * sizeof(*V.data));      \
+  } while(0)
+
+#define VEC_FINAL(V) free(V.data)
+
+#define VEC_PUSH(V, X)                                  \
+  do {                                                  \
+    if (V.len == V.cap) {                               \
+      V.cap *= 2;                                       \
+      void **vp = (void**)&V.data;                      \
+      *vp = realloc(V.data, V.cap * sizeof(*V.data));   \
+    }                                                   \
+    V.data[V.len++] = X;                                \
+  } while(0)                                            \
+
+#define VEC_POP(V)                                              \
+  do {                                                          \
+    if (V.len > 0)                                              \
+      memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data));     \
+  } while(0)
+
+/* Simple thread-safe queue implementation */
+typedef struct queue_t {
+  pthread_mutex_t lock;
+  char name[256];
+  VEC(pn_rwbytes_t) messages;   /* Messages on the queue_t */
+  VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
+  struct queue_t *next;            /* Next queue in chain */
+  size_t sent;                     /* Count of messages sent, used as delivery tag */
+} queue_t;
+
+static void queue_init(queue_t *q, const char* name, queue_t *next) {
+  pthread_mutex_init(&q->lock, NULL);
+  strncpy(q->name, name, sizeof(q->name));
+  VEC_INIT(q->messages);
+  VEC_INIT(q->waiting);
+  q->next = next;
+  q->sent = 0;
+}
+
+static void queue_destroy(queue_t *q) {
+  pthread_mutex_destroy(&q->lock);
+  free(q->name);
+  for (size_t i = 0; i < q->messages.len; ++i)
+    free(q->messages.data[i].start);
+  VEC_FINAL(q->messages);
+  for (size_t i = 0; i < q->waiting.len; ++i)
+    pn_decref(q->waiting.data[i]);
+  VEC_FINAL(q->waiting);
+}
+
+/* Send a message on s, or record s as eating if no messages.
+   Called in s dispatch loop, assumes s has credit.
+*/
+static void queue_send(queue_t *q, pn_link_t *s) {
+  pn_rwbytes_t m = { 0 };
+  size_t tag = 0;
+  pthread_mutex_lock(&q->lock);
+  if (q->messages.len == 0) { /* Empty, record connection as waiting */
+    /* Record connection for wake-up if not already on the list. */
+    pn_connection_t *c = pn_session_connection(pn_link_session(s));
+    size_t i = 0;
+    for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
+      ;
+    if (i == q->waiting.len) {
+      VEC_PUSH(q->waiting, c);
+    }
+  } else {
+    m = q->messages.data[0];
+    VEC_POP(q->messages);
+    tag = ++q->sent;
+  }
+  pthread_mutex_unlock(&q->lock);
+  if (m.start) {
+    pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
+    pn_link_send(s, m.start, m.size);
+    pn_link_advance(s);
+    pn_delivery_settle(d);  /* Pre-settled: unreliable, there will bea no ack/ */
+    free(m.start);
+  }
+}
+
+/* Data associated with each broker connection */
+typedef struct broker_data_t {
+  bool check_queues;          /* Check senders on the connection for available data in queues. */
+} broker_data_t;
+
+/* Use the context pointer as a boolean flag to indicate we need to check queues */
+void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
+  pn_connection_set_context(c, (void*)check);
+}
+
+bool pn_connection_get_check_queues(pn_connection_t *c) {
+  return (bool)pn_connection_get_context(c);
+}
+
+/* Put a message on the queue, called in receiver dispatch loop.
+   If the queue was previously empty, notify waiting senders.
+*/
+static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
+  pthread_mutex_lock(&q->lock);
+  VEC_PUSH(q->messages, m);
+  if (q->messages.len == 1) { /* Was empty, notify waiting connections */
+    for (size_t i = 0; i < q->waiting.len; ++i) {
+      pn_connection_t *c = q->waiting.data[i];
+      pn_connection_set_check_queues(c, true);
+      pn_connection_wake(c); /* Wake the connection */
+    }
+    q->waiting.len = 0;
+  }
+  pthread_mutex_unlock(&q->lock);
+}
+
+/* Thread safe set of queues */
+typedef struct queues_t {
+  pthread_mutex_t lock;
+  queue_t *queues;
+  size_t sent;
+} queues_t;
+
+void queues_init(queues_t *qs) {
+  pthread_mutex_init(&qs->lock, NULL);
+  qs->queues = NULL;
+}
+
+void queues_destroy(queues_t *qs) {
+  for (queue_t *q = qs->queues; q; q = q->next) {
+    queue_destroy(q);
+    free(q);
+  }
+  pthread_mutex_destroy(&qs->lock);
+}
+
+/** Get or create the named queue. */
+queue_t* queues_get(queues_t *qs, const char* name) {
+  pthread_mutex_lock(&qs->lock);
+  queue_t *q;
+  for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
+    ;
+  if (!q) {
+    q = (queue_t*)malloc(sizeof(queue_t));
+    queue_init(q, name, qs->queues);
+    qs->queues = q;
+  }
+  pthread_mutex_unlock(&qs->lock);
+  return q;
+}
+
+/* The broker implementation */
+typedef struct broker_t {
+  pn_proactor_t *proactor;
+  size_t threads;
+  const char *container_id;     /* AMQP container-id */
+  queues_t queues;
+  bool finished;
+} broker_t;
+
+void broker_stop(broker_t *b) {
+  /* Interrupt the proactor to stop the working threads. */
+  pn_proactor_interrupt(b->proactor);
+}
+
+/* Try to send if link is sender and has credit */
+static void link_send(broker_t *b, pn_link_t *s) {
+  if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
+    const char *qname = pn_terminus_get_address(pn_link_source(s));
+    queue_t *q = queues_get(&b->queues, qname);
+    queue_send(q, s);
+  }
+}
+
+static void queue_unsub(queue_t *q, pn_connection_t *c) {
+  pthread_mutex_lock(&q->lock);
+  for (size_t i = 0; i < q->waiting.len; ++i) {
+    if (q->waiting.data[i] == c){
+      q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
+      VEC_POP(q->waiting);
+      break;
+    }
+  }
+  pthread_mutex_unlock(&q->lock);
+}
+
+/* Unsubscribe from the queue of interest to this link. */
+static void link_unsub(broker_t *b, pn_link_t *s) {
+  if (pn_link_is_sender(s)) {
+    const char *qname = pn_terminus_get_address(pn_link_source(s));
+    if (qname) {
+      queue_t *q = queues_get(&b->queues, qname);
+      queue_unsub(q, pn_session_connection(pn_link_session(s)));
+    }
+  }
+}
+
+/* Called in connection's event loop when a connection is woken for messages.*/
+static void connection_unsub(broker_t *b, pn_connection_t *c) {
+  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
+    link_unsub(b, l);
+}
+
+static void session_unsub(broker_t *b, pn_session_t *ssn) {
+  pn_connection_t *c = pn_session_connection(ssn);
+  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
+    if (pn_link_session(l) == ssn)
+      link_unsub(b, l);
+  }
+}
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    exit_code = 1;              /* Remeber there was an unexpected error */
+  }
+}
+
+const int WINDOW=10;            /* Incoming credit window */
+
+static void handle(broker_t* b, pn_event_t* e) {
+  pn_connection_t *c = pn_event_connection(e);
+
+  switch (pn_event_type(e)) {
+
+   case PN_LISTENER_OPEN:
+    printf("listening\n");
+    fflush(stdout);
+    break;
+
+   case PN_LISTENER_ACCEPT:
+    pn_listener_accept(pn_event_listener(e), pn_connection());
+    break;
+
+   case PN_CONNECTION_INIT:
+     pn_connection_set_container(c, b->container_id);
+     break;
+
+   case PN_CONNECTION_BOUND: {
+     /* Turn off security */
+     pn_transport_t *t = pn_connection_transport(c);
+     pn_transport_require_auth(t, false);
+     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
+     break;
+   }
+   case PN_CONNECTION_REMOTE_OPEN: {
+     pn_connection_open(pn_event_connection(e)); /* Complete the open */
+     break;
+   }
+   case PN_CONNECTION_WAKE: {
+     if (pn_connection_get_check_queues(c)) {
+       pn_connection_set_check_queues(c, false);
+       int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
+       for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
+         link_send(b, l);
+     }
+     break;
+   }
+   case PN_SESSION_REMOTE_OPEN: {
+     pn_session_open(pn_event_session(e));
+     break;
+   }
+   case PN_LINK_REMOTE_OPEN: {
+     pn_link_t *l = pn_event_link(e);
+     if (pn_link_is_sender(l)) {
+       const char *source = pn_terminus_get_address(pn_link_remote_source(l));
+       pn_terminus_set_address(pn_link_source(l), source);
+     } else {
+       const char* target = pn_terminus_get_address(pn_link_remote_target(l));
+       pn_terminus_set_address(pn_link_target(l), target);
+       pn_link_flow(l, WINDOW);
+     }
+     pn_link_open(l);
+     break;
+   }
+   case PN_LINK_FLOW: {
+     link_send(b, pn_event_link(e));
+     break;
+   }
+   case PN_DELIVERY: {
+     pn_delivery_t *d = pn_event_delivery(e);
+     pn_link_t *r = pn_delivery_link(d);
+     if (pn_link_is_receiver(r) &&
+         pn_delivery_readable(d) && !pn_delivery_partial(d))
+     {
+       size_t size = pn_delivery_pending(d);
+       /* The broker does not decode the message, just forwards it. */
+       pn_rwbytes_t m = { size, (char*)malloc(size) };
+       pn_link_recv(r, m.start, m.size);
+       const char *qname = pn_terminus_get_address(pn_link_target(r));
+       queue_receive(b->proactor, queues_get(&b->queues, qname), m);
+       pn_delivery_update(d, PN_ACCEPTED);
+       pn_delivery_settle(d);
+       pn_link_flow(r, WINDOW - pn_link_credit(r));
+     }
+     break;
+   }
+
+   case PN_TRANSPORT_CLOSED:
+    check_condition(e, pn_transport_condition(pn_event_transport(e)));
+    connection_unsub(b, pn_event_connection(e));
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
+    pn_connection_close(pn_event_connection(e));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(e, pn_session_remote_condition(pn_event_session(e)));
+    session_unsub(b, pn_event_session(e));
+    pn_session_close(pn_event_session(e));
+    pn_session_free(pn_event_session(e));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+    check_condition(e, pn_link_remote_condition(pn_event_link(e)));
+    link_unsub(b, pn_event_link(e));
+    pn_link_close(pn_event_link(e));
+    pn_link_free(pn_event_link(e));
+    break;
+
+   case PN_LISTENER_CLOSE:
+    check_condition(e, pn_listener_condition(pn_event_listener(e)));
+    broker_stop(b);
+    break;
+
+ break;
+
+   case PN_PROACTOR_INACTIVE:   /* listener and all connections closed */
+    broker_stop(b);
+    break;
+
+   case PN_PROACTOR_INTERRUPT:
+    b->finished = true;
+    pn_proactor_interrupt(b->proactor); /* Pass along the interrupt to the other threads */
+    break;
+
+   default:
+    break;
+  }
+}
+
+static void* broker_thread(void *void_broker) {
+  broker_t *b = (broker_t*)void_broker;
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(b->proactor);
+    pn_event_t *e;
+    while ((e = pn_event_batch_next(events))) {
+      handle(b, e);
+    }
+    pn_proactor_done(b->proactor, events);
+  } while(!b->finished);
+  return NULL;
+}
+
+int main(int argc, char **argv) {
+  broker_t b = {0};
+  b.proactor = pn_proactor();
+  queues_init(&b.queues);
+  b.container_id = argv[0];
+  b.threads = 4;
+  int i = 1;
+  const char *host = (argc > i) ? argv[i++] : "";
+  const char *port = (argc > i) ? argv[i++] : "amqp";
+
+  /* Listen on addr */
+  char addr[PN_MAX_ADDR];
+  pn_proactor_addr(addr, sizeof(addr), host, port);
+  pn_proactor_listen(b.proactor, pn_listener(), addr, 16);
+
+  /* Start n-1 threads */
+  pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
+  for (size_t i = 0; i < b.threads-1; ++i) {
+    pthread_create(&threads[i], NULL, broker_thread, &b);
+  }
+  broker_thread(&b);            /* Use the main thread too. */
+  /* Join the other threads */
+  for (size_t i = 0; i < b.threads-1; ++i) {
+    pthread_join(threads[i], NULL);
+  }
+  pn_proactor_free(b.proactor);
+  free(threads);
+  return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/direct.c b/examples/c/direct.c
new file mode 100644
index 0000000..15550e6
--- /dev/null
+++ b/examples/c/direct.c
@@ -0,0 +1,326 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/listener.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+typedef struct app_data_t {
+  const char *host, *port;
+  const char *amqp_address;
+  const char *container_id;
+  int message_count;
+
+  pn_proactor_t *proactor;
+  pn_listener_t *listener;
+  pn_rwbytes_t message_buffer;
+
+  /* Sender values */
+  int sent;
+  int acknowledged;
+  pn_link_t *sender;
+
+  /* Receiver values */
+  int received;
+} app_data_t;
+
+static const int BATCH = 1000; /* Batch size for unlimited receive */
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    exit_code = 1;
+  }
+}
+
+/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
+static pn_bytes_t encode_message(app_data_t* app) {
+  /* Construct a message with the map { "sequence": app.sent } */
+  pn_message_t* message = pn_message();
+  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+  pn_data_t* body = pn_message_body(message);
+  pn_data_put_map(body);
+  pn_data_enter(body);
+  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+  pn_data_put_int(body, app->sent); /* The sequence number */
+  pn_data_exit(body);
+
+  /* encode the message, expanding the encode buffer as needed */
+  if (app->message_buffer.start == NULL) {
+    static const size_t initial_size = 128;
+    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+  }
+  /* app->message_buffer is the total buffer space available. */
+  /* mbuf wil point at just the portion used by the encoded message */
+  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+  int status = 0;
+  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+    app->message_buffer.size *= 2;
+    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+    mbuf.size = app->message_buffer.size;
+  }
+  if (status != 0) {
+    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+    exit(1);
+  }
+  pn_message_free(message);
+  return pn_bytes(mbuf.size, mbuf.start);
+}
+
+#define MAX_SIZE 1024
+
+static void decode_message(pn_delivery_t *dlv) {
+  static char buffer[MAX_SIZE];
+  ssize_t len;
+  // try to decode the message body
+  if (pn_delivery_pending(dlv) < MAX_SIZE) {
+    // read in the raw data
+    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
+    if (len > 0) {
+      // decode it into a proton message
+      pn_message_t *m = pn_message();
+      if (PN_OK == pn_message_decode(m, buffer, len)) {
+        pn_string_t *s = pn_string(NULL);
+        pn_inspect(pn_message_body(m), s);
+        printf("%s\n", pn_string_get(s));
+        pn_free(s);
+      }
+      pn_message_free(m);
+    }
+  }
+}
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_LINK_REMOTE_OPEN: {
+     pn_link_t *l = pn_event_link(event);
+     pn_link_open(l);
+     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+   } break;
+
+   case PN_DELIVERY: {
+     /* A message has been received */
+     pn_link_t *link = NULL;
+     pn_delivery_t *dlv = pn_event_delivery(event);
+     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+       link = pn_delivery_link(dlv);
+       decode_message(dlv);
+       /* Accept the delivery */
+       pn_delivery_update(dlv, PN_ACCEPTED);
+       /* done with the delivery, move to the next and free it */
+       pn_link_advance(link);
+       pn_delivery_settle(dlv);  /* dlv is now freed */
+
+       if (app->message_count == 0) {
+         /* receive forever - see if more credit is needed */
+         if (pn_link_credit(link) < BATCH/2) {
+           /* Grant enough credit to bring it up to BATCH: */
+           pn_link_flow(link, BATCH - pn_link_credit(link));
+         }
+       } else if (++app->received >= app->message_count) {
+         /* done receiving, close the endpoints */
+         printf("%d messages received\n", app->received);
+         pn_session_t *ssn = pn_link_session(link);
+         pn_link_close(link);
+         pn_session_close(ssn);
+         pn_connection_close(pn_session_connection(ssn));
+       }
+     }
+   } break;
+
+   default:
+    break;
+  }
+}
+
+/* This function handles events when we are acting as the sender */
+static void handle_send(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_LINK_REMOTE_OPEN: {
+     pn_link_t* l = pn_event_link(event);
+     pn_terminus_set_address(pn_link_target(l), app->amqp_address);
+     pn_link_open(l);
+   } break;
+
+   case PN_LINK_FLOW: {
+     /* The peer has given us some credit, now we can send messages */
+     pn_link_t *sender = pn_event_link(event);
+     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+       ++app->sent;
+       // Use sent counter as unique delivery tag.
+       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+       pn_bytes_t msgbuf = encode_message(app);
+       pn_link_send(sender, msgbuf.start, msgbuf.size);
+       pn_link_advance(sender);
+     }
+     break;
+   }
+
+   case PN_DELIVERY: {
+     /* We received acknowledgedment from the peer that a message was delivered. */
+     pn_delivery_t* d = pn_event_delivery(event);
+     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+       if (++app->acknowledged == app->message_count) {
+         printf("%d messages sent and acknowledged\n", app->acknowledged);
+         pn_connection_close(pn_event_connection(event));
+         /* Continue handling events till we receive TRANSPORT_CLOSED */
+       }
+     }
+   } break;
+
+   default:
+    break;
+  }
+}
+
+/* Handle all events, delegate to handle_send or handle_receive depending on link mode.
+   Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_LISTENER_OPEN:
+    printf("listening\n");
+    fflush(stdout);
+    break;
+
+   case PN_LISTENER_ACCEPT:
+    pn_listener_accept(pn_event_listener(event), pn_connection());
+    break;
+
+   case PN_CONNECTION_INIT:
+    pn_connection_set_container(pn_event_connection(event), app->container_id);
+    break;
+
+   case PN_CONNECTION_BOUND: {
+     /* Turn off security */
+     pn_transport_t *t = pn_event_transport(event);
+     pn_transport_require_auth(t, false);
+     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
+   }
+   case PN_CONNECTION_REMOTE_OPEN: {
+     pn_connection_open(pn_event_connection(event)); /* Complete the open */
+     break;
+   }
+
+   case PN_SESSION_REMOTE_OPEN: {
+     pn_session_open(pn_event_session(event));
+     break;
+   }
+
+   case PN_TRANSPORT_CLOSED:
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    pn_listener_close(app->listener); /* Finished */
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+   case PN_LINK_REMOTE_DETACH:
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_PROACTOR_TIMEOUT:
+    /* Wake the sender's connection */
+    pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
+    break;
+
+   case PN_LISTENER_CLOSE:
+    check_condition(event, pn_listener_condition(pn_event_listener(event)));
+    break;
+
+   case PN_PROACTOR_INACTIVE:
+    return false;
+    break;
+
+   default: {
+     pn_link_t *l = pn_event_link(event);
+     if (l) {                      /* Only delegate link-related events */
+       if (pn_link_is_sender(l)) {
+         handle_send(app, event);
+       } else {
+         handle_receive(app, event);
+       }
+     }
+   }
+  }
+  return true;
+}
+
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
+}
+
+int main(int argc, char **argv) {
+  struct app_data_t app = {0};
+  int i = 0;
+  app.container_id = argv[i++];   /* Should be unique */
+  app.host = (argc > 1) ? argv[i++] : "";
+  app.port = (argc > 1) ? argv[i++] : "amqp";
+  app.amqp_address = (argc > i) ? argv[i++] : "examples";
+  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  app.listener = pn_listener();
+  char addr[PN_MAX_ADDR];
+  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+  pn_proactor_listen(app.proactor, app.listener, addr, 16);
+  run(&app);
+  pn_proactor_free(app.proactor);
+  free(app.message_buffer.start);
+  return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/example_test.py b/examples/c/example_test.py
new file mode 100644
index 0000000..02bb1fd
--- /dev/null
+++ b/examples/c/example_test.py
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# This is a test script to run the examples and verify that they behave as expected.
+
+import unittest, sys, time
+from proctest import *
+
+def python_cmd(name):
+    dir = os.path.dirname(__file__)
+    return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
+
+def receive_expect(n):
+    return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
+
+class Broker(object):
+    def __init__(self, test):
+        self.test = test
+
+    def __enter__(self):
+        with TestPort() as tp:
+            self.port = tp.port
+            self.host = tp.host
+            self.addr = tp.addr
+            self.proc = self.test.proc(["broker", "", self.port])
+            self.proc.wait_re("listening")
+            return self
+
+    def __exit__(self, *args):
+        b = getattr(self, "proc")
+        if b:
+            if b.poll() !=  None: # Broker crashed
+                raise ProcError(b, "broker crash")
+            b.kill()
+
+class CExampleTest(ProcTestCase):
+
+    def test_send_receive(self):
+        """Send first then receive"""
+        with Broker(self) as b:
+            s = self.proc(["send", "", b.port])
+            self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
+            r = self.proc(["receive", "", b.port])
+            self.assertEqual(receive_expect(10), r.wait_exit())
+
+    def test_receive_send(self):
+        """Start receiving  first, then send."""
+        with Broker(self) as b:
+            r = self.proc(["receive", "", b.port]);
+            s = self.proc(["send", "", b.port]);
+            self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
+            self.assertEqual(receive_expect(10), r.wait_exit())
+
+    def test_send_direct(self):
+        """Send to direct server"""
+        with TestPort() as tp:
+            d = self.proc(["direct", "", tp.port])
+            d.wait_re("listening")
+            self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", "", tp.port]).wait_exit())
+            self.assertIn(receive_expect(10), d.wait_exit())
+
+    def test_receive_direct(self):
+        """Receive from direct server"""
+        with TestPort() as tp:
+            d = self.proc(["direct", "", tp.port])
+            d.wait_re("listening")
+            self.assertEqual(receive_expect(10), self.proc(["receive", "", tp.port]).wait_exit())
+            self.assertIn("10 messages sent and acknowledged\n", d.wait_exit())
+
+
+if __name__ == "__main__":
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/messenger/CMakeLists.txt b/examples/c/messenger/CMakeLists.txt
deleted file mode 100644
index d4fec71..0000000
--- a/examples/c/messenger/CMakeLists.txt
+++ /dev/null
@@ -1,52 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED)
-
-set (messenger-examples
-  recv.c
-  send.c
-  recv-async.c
-  send-async.c
-  )
-
-set_source_files_properties (
-  ${messenger-examples}
-  PROPERTIES
-  COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LINK_TIME_OPTIMIZATION}"
-  )
-
-if (BUILD_WITH_CXX)
-  set_source_files_properties (
-    ${messenger-examples}
-    PROPERTIES LANGUAGE CXX
-    )
-endif (BUILD_WITH_CXX)
-
-add_executable(recv recv.c)
-add_executable(send send.c)
-add_executable(recv-async recv-async.c)
-add_executable(send-async send-async.c)
-
-include_directories(${Proton_INCLUDE_DIRS})
-
-target_link_libraries(recv ${Proton_LIBRARIES})
-target_link_libraries(send ${Proton_LIBRARIES})
-target_link_libraries(recv-async ${Proton_LIBRARIES})
-target_link_libraries(send-async ${Proton_LIBRARIES})

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/recv-async.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/recv-async.c b/examples/c/messenger/recv-async.c
deleted file mode 100644
index 1f49166..0000000
--- a/examples/c/messenger/recv-async.c
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// This is a re-implementation of recv.c using non-blocking/asynchronous calls.
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <ctype.h>
-
-#if EMSCRIPTEN
-#include <emscripten.h>
-#endif
-
-pn_message_t * message;
-pn_messenger_t * messenger;
-
-#define check(messenger)                                                     \
-  {                                                                          \
-    if(pn_messenger_errno(messenger))                                        \
-    {                                                                        \
-      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
-    }                                                                        \
-  }                                                                          \
-
-void die(const char *file, int line, const char *message)
-{
-    fprintf(stderr, "%s:%i: %s\n", file, line, message);
-    exit(1);
-}
-
-void usage(void)
-{
-    printf("Usage: recv [options] <addr>\n");
-    printf("-c    \tPath to the certificate file.\n");
-    printf("-k    \tPath to the private key file.\n");
-    printf("-p    \tPassword for the private key.\n");
-    printf("<addr>\tAn address.\n");
-    exit(0);
-}
-
-void process(void) {
-    while(pn_messenger_incoming(messenger))
-    {
-        pn_messenger_get(messenger, message);
-        check(messenger);
-
-        {
-        pn_tracker_t tracker = pn_messenger_incoming_tracker(messenger);
-        char buffer[1024];
-        size_t buffsize = sizeof(buffer);
-        const char* subject = pn_message_get_subject(message);
-        pn_data_t* body = pn_message_body(message);
-        pn_data_format(body, buffer, &buffsize);
-
-        printf("Address: %s\n", pn_message_get_address(message));
-        printf("Subject: %s\n", subject ? subject : "(no subject)");
-        printf("Content: %s\n", buffer);
-
-        pn_messenger_accept(messenger, tracker, 0);
-        }
-    }
-}
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
-void pump(int fd, void* userData) {
-    while (pn_messenger_work(messenger, 0) >= 0) {
-        process();
-    }
-}
-
-void onclose(int fd, void* userData) {
-    process();
-}
-
-void onerror(int fd, int errno, const char* msg, void* userData) {
-    printf("error callback fd = %d, errno = %d, msg = %s\n", fd, errno, msg);
-}
-#endif
-
-int main(int argc, char** argv)
-{
-    char* certificate = NULL;
-    char* privatekey = NULL;
-    char* password = NULL;
-    char* address = (char *) "amqp://~0.0.0.0";
-    int c;
-
-    message = pn_message();
-    messenger = pn_messenger(NULL);
-    pn_messenger_set_blocking(messenger, false); // Needs to be set non-blocking to behave asynchronously.
-
-    opterr = 0;
-
-    while((c = getopt(argc, argv, "hc:k:p:")) != -1)
-    {
-        switch(c)
-        {
-            case 'h':
-                usage();
-                break;
-
-            case 'c': certificate = optarg; break;
-            case 'k': privatekey = optarg; break;
-            case 'p': password = optarg; break;
-
-            case '?':
-                if (optopt == 'c' ||
-                    optopt == 'k' ||
-                    optopt == 'p')
-                {
-                    fprintf(stderr, "Option -%c requires an argument.\n", optopt);
-                }
-                else if(isprint(optopt))
-                {
-                    fprintf(stderr, "Unknown option `-%c'.\n", optopt);
-                }
-                else
-                {
-                    fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
-                }
-                return 1;
-            default:
-                abort();
-        }
-    }
-
-    if (optind < argc)
-    {
-        address = argv[optind];
-    }
-
-    /* load the various command line options if they're set */
-    if(certificate)
-    {
-        pn_messenger_set_certificate(messenger, certificate);
-    }
-
-    if(privatekey)
-    {
-        pn_messenger_set_private_key(messenger, privatekey);
-    }
-
-    if(password)
-    {
-        pn_messenger_set_password(messenger, password);
-    }
-
-    pn_messenger_start(messenger);
-    check(messenger);
-
-    pn_messenger_subscribe(messenger, address);
-    check(messenger);
-
-    pn_messenger_recv(messenger, -1); // Set to receive as many messages as messenger can buffer.
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
-    emscripten_set_socket_error_callback(NULL, onerror);
-
-    emscripten_set_socket_open_callback(NULL, pump);
-    emscripten_set_socket_connection_callback(NULL, pump);
-    emscripten_set_socket_message_callback(NULL, pump);
-    emscripten_set_socket_close_callback(NULL, onclose);
-#else // For native compiler.
-    while (1) {
-        pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
-        process();
-    }
-#endif
-
-    return 0;
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/recv.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/recv.c b/examples/c/messenger/recv.c
deleted file mode 100644
index 16e8321..0000000
--- a/examples/c/messenger/recv.c
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <ctype.h>
-
-#define check(messenger)                                                     \
-  {                                                                          \
-    if(pn_messenger_errno(messenger))                                        \
-    {                                                                        \
-      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
-    }                                                                        \
-  }                                                                          \
-
-void die(const char *file, int line, const char *message)
-{
-  fprintf(stderr, "%s:%i: %s\n", file, line, message);
-  exit(1);
-}
-
-void usage(void)
-{
-  printf("Usage: recv [options] <addr>\n");
-  printf("-c    \tPath to the certificate file.\n");
-  printf("-k    \tPath to the private key file.\n");
-  printf("-p    \tPassword for the private key.\n");
-  printf("<addr>\tAn address.\n");
-  exit(0);
-}
-
-int main(int argc, char** argv)
-{
-  char* certificate = NULL;
-  char* privatekey = NULL;
-  char* password = NULL;
-  char* address = (char *) "amqp://~0.0.0.0";
-  int c;
-
-  pn_message_t * message;
-  pn_messenger_t * messenger;
-
-  message = pn_message();
-  messenger = pn_messenger(NULL);
-
-  opterr = 0;
-
-  while((c = getopt(argc, argv, "hc:k:p:")) != -1)
-  {
-    switch(c)
-    {
-    case 'h':
-      usage();
-      break;
-
-    case 'c': certificate = optarg; break;
-    case 'k': privatekey = optarg; break;
-    case 'p': password = optarg; break;
-
-    case '?':
-      if(optopt == 'c' ||
-         optopt == 'k' ||
-         optopt == 'p')
-      {
-        fprintf(stderr, "Option -%c requires an argument.\n", optopt);
-      }
-      else if(isprint(optopt))
-      {
-        fprintf(stderr, "Unknown option `-%c'.\n", optopt);
-      }
-      else
-      {
-        fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
-      }
-      return 1;
-    default:
-      abort();
-    }
-  }
-
-  if (optind < argc)
-  {
-    address = argv[optind];
-  }
-
-  /* load the various command line options if they're set */
-  if(certificate)
-  {
-    pn_messenger_set_certificate(messenger, certificate);
-  }
-
-  if(privatekey)
-  {
-    pn_messenger_set_private_key(messenger, privatekey);
-  }
-
-  if(password)
-  {
-    pn_messenger_set_password(messenger, password);
-  }
-
-  pn_messenger_start(messenger);
-  check(messenger);
-
-  pn_messenger_subscribe(messenger, address);
-  check(messenger);
-
-  for(;;)
-  {
-    pn_messenger_recv(messenger, 1024);
-    check(messenger);
-
-    while(pn_messenger_incoming(messenger))
-    {
-      pn_messenger_get(messenger, message);
-      check(messenger);
-
-      {
-      char buffer[1024];
-      size_t buffsize = sizeof(buffer);
-      const char* subject = pn_message_get_subject(message);
-      pn_data_t *body = pn_message_body(message);
-      pn_data_format(body, buffer, &buffsize);
-
-      printf("Address: %s\n", pn_message_get_address(message));
-      printf("Subject: %s\n", subject ? subject : "(no subject)");
-      printf("Content: %s\n", buffer);
-      }
-    }
-  }
-
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/send-async.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/send-async.c b/examples/c/messenger/send-async.c
deleted file mode 100644
index de9b023..0000000
--- a/examples/c/messenger/send-async.c
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// This is a re-implementation of send.c using non-blocking/asynchronous calls.
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <ctype.h>
-
-#if EMSCRIPTEN
-#include <emscripten.h>
-#endif
-
-pn_message_t * message;
-pn_messenger_t * messenger;
-pn_tracker_t tracker;
-int running = 1;
-
-#define check(messenger)                                                     \
-  {                                                                          \
-    if(pn_messenger_errno(messenger))                                        \
-    {                                                                        \
-      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
-    }                                                                        \
-  }                                                                          \
-
-void die(const char *file, int line, const char *message)
-{
-    fprintf(stderr, "%s:%i: %s\n", file, line, message);
-    exit(1);
-}
-
-void usage(void)
-{
-    printf("Usage: send [-a addr] [message]\n");
-    printf("-a     \tThe target address [amqp[s]://domain[/name]]\n");
-    printf("message\tA text string to send.\n");
-    exit(0);
-}
-
-void process(void) {
-    pn_status_t status = pn_messenger_status(messenger, tracker);
-    if (status != PN_STATUS_PENDING) {
-        if (running) {
-            pn_messenger_stop(messenger);
-            running = 0;
-        } 
-    }
-
-    if (pn_messenger_stopped(messenger)) {
-        pn_message_free(message);
-        pn_messenger_free(messenger);
-        message = NULL;
-        messenger = NULL;
-    }
-}
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
-void pump(int fd, void* userData) {
-    while (pn_messenger_work(messenger, 0) >= 0) {
-        process();
-    }
-}
-
-void onclose(int fd, void* userData) {
-    process();
-}
-
-void onerror(int fd, int errno, const char* msg, void* userData) {
-    printf("error callback fd = %d, errno = %d, msg = %s\n", fd, errno, msg);
-}
-#endif
-
-int main(int argc, char** argv)
-{
-    int c;
-    char * address = (char *) "amqp://0.0.0.0";
-    char * msgtext = (char *) "Hello World!";
-    pn_data_t* body;
-
-    opterr = 0;
-
-    while((c = getopt(argc, argv, "ha:b:c:")) != -1)
-    {
-        switch(c)
-        {
-            case 'a': address = optarg; break;
-            case 'h': usage(); break;
-
-            case '?':
-                if(optopt == 'a')
-                {
-                    fprintf(stderr, "Option -%c requires an argument.\n", optopt);
-                }
-                else if(isprint(optopt))
-                {
-                    fprintf(stderr, "Unknown option `-%c'.\n", optopt);
-                }
-                else
-                {
-                    fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
-                }
-                return 1;
-            default:
-                abort();
-        }
-    }
-
-    if (optind < argc) msgtext = argv[optind];
-
-    message = pn_message();
-    messenger = pn_messenger(NULL);
-    pn_messenger_set_blocking(messenger, false); // Needs to be set non-blocking to behave asynchronously.
-    pn_messenger_set_outgoing_window(messenger, 1024); 
-
-    pn_messenger_start(messenger);
-
-    pn_message_set_address(message, address);
-    body = pn_message_body(message);
-    pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
-
-    pn_messenger_put(messenger, message);
-    check(messenger);
-
-    tracker = pn_messenger_outgoing_tracker(messenger);
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
-    emscripten_set_socket_error_callback(NULL, onerror);
-
-    emscripten_set_socket_open_callback(NULL, pump);
-    emscripten_set_socket_connection_callback(NULL, pump);
-    emscripten_set_socket_message_callback(NULL, pump);
-    emscripten_set_socket_close_callback(NULL, onclose);
-#else // For native compiler.
-    while (running) {
-        pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
-        process();
-    }
-
-    while (messenger && !pn_messenger_stopped(messenger)) {
-        pn_messenger_work(messenger, 0);
-        process();
-    }
-#endif
-
-    return 0;
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/send.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/send.c b/examples/c/messenger/send.c
deleted file mode 100644
index 11b47ff..0000000
--- a/examples/c/messenger/send.c
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <ctype.h>
-
-#define check(messenger)                                                     \
-  {                                                                          \
-    if(pn_messenger_errno(messenger))                                        \
-    {                                                                        \
-      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
-    }                                                                        \
-  }                                                                          \
-
-void die(const char *file, int line, const char *message)
-{
-  fprintf(stderr, "%s:%i: %s\n", file, line, message);
-  exit(1);
-}
-
-void usage(void)
-{
-  printf("Usage: send [-a addr] [message]\n");
-  printf("-a     \tThe target address [amqp[s]://domain[/name]]\n");
-  printf("message\tA text string to send.\n");
-  exit(0);
-}
-
-int main(int argc, char** argv)
-{
-  int c;
-  char * address = (char *) "amqp://0.0.0.0";
-  char * msgtext = (char *) "Hello World!";
-  opterr = 0;
-
-  while((c = getopt(argc, argv, "ha:b:c:")) != -1)
-  {
-    switch(c)
-    {
-    case 'a': address = optarg; break;
-    case 'h': usage(); break;
-
-    case '?':
-      if(optopt == 'a')
-      {
-        fprintf(stderr, "Option -%c requires an argument.\n", optopt);
-      }
-      else if(isprint(optopt))
-      {
-        fprintf(stderr, "Unknown option `-%c'.\n", optopt);
-      }
-      else
-      {
-        fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
-      }
-      return 1;
-    default:
-      abort();
-    }
-  }
-
-  if (optind < argc) msgtext = argv[optind];
-
-  {
-  pn_message_t * message;
-  pn_messenger_t * messenger;
-  pn_data_t * body;
-
-  message = pn_message();
-  messenger = pn_messenger(NULL);
-
-  pn_messenger_start(messenger);
-
-  pn_message_set_address(message, address);
-  body = pn_message_body(message);
-  pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
-  pn_messenger_put(messenger, message);
-  check(messenger);
-  pn_messenger_send(messenger, -1);
-  check(messenger);
-
-  pn_messenger_stop(messenger);
-  pn_messenger_free(messenger);
-  pn_message_free(message);
-  }
-
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
deleted file mode 100644
index 6ea8aaf..0000000
--- a/examples/c/proactor/CMakeLists.txt
+++ /dev/null
@@ -1,44 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED Core Proactor)
-set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
-find_package(Threads REQUIRED)
-
-include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
-include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
-
-add_definitions(${COMPILE_LANGUAGE_FLAGS} ${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
-
-# Add a test with the correct environment to find test executables and valgrind.
-if(WIN32)
-  set(test_path "$<TARGET_FILE_DIR:broker>;$<TARGET_FILE_DIR:qpid-proton>")
-else(WIN32)
-  set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
-endif(WIN32)
-
-foreach(name broker send receive direct)
-  add_executable(proactor-${name} ${name}.c)
-  target_link_libraries(proactor-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
-  set_target_properties(proactor-${name} PROPERTIES OUTPUT_NAME ${name})
-endforeach()
-
-set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
-
-add_test(c-example-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox
deleted file mode 100644
index a548d35..0000000
--- a/examples/c/proactor/README.dox
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * @example send.c
- *
- * Send a fixed number of messages to the "examples" node.
- * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
- *
- * @example receive.c
- *
- * Subscribes to the 'example' node and prints the message bodies received.
- * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
- *
- * @example direct.c
- *
- * A server that can be used to demonstrate direct (no broker) peer-to-peer communication
- * It can accept an incoming connection from either the @ref send.c or @ref receive.c examples
- * and will act as the directly-connected counterpart (receive or send)
- *
- * @example broker.c
- *
- * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples.
- */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
deleted file mode 100644
index e0d9672..0000000
--- a/examples/c/proactor/broker.c
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "thread.h"
-
-#include <proton/engine.h>
-#include <proton/listener.h>
-#include <proton/proactor.h>
-#include <proton/sasl.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-/* Simple re-sizable vector that acts as a queue */
-#define VEC(T) struct { T* data; size_t len, cap; }
-
-#define VEC_INIT(V)                             \
-  do {                                          \
-    V.len = 0;                                  \
-    V.cap = 16;                                 \
-    void **vp = (void**)&V.data;                \
-    *vp = malloc(V.cap * sizeof(*V.data));      \
-  } while(0)
-
-#define VEC_FINAL(V) free(V.data)
-
-#define VEC_PUSH(V, X)                                  \
-  do {                                                  \
-    if (V.len == V.cap) {                               \
-      V.cap *= 2;                                       \
-      void **vp = (void**)&V.data;                      \
-      *vp = realloc(V.data, V.cap * sizeof(*V.data));   \
-    }                                                   \
-    V.data[V.len++] = X;                                \
-  } while(0)                                            \
-
-#define VEC_POP(V)                                              \
-  do {                                                          \
-    if (V.len > 0)                                              \
-      memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data));     \
-  } while(0)
-
-/* Simple thread-safe queue implementation */
-typedef struct queue_t {
-  pthread_mutex_t lock;
-  char name[256];
-  VEC(pn_rwbytes_t) messages;   /* Messages on the queue_t */
-  VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
-  struct queue_t *next;            /* Next queue in chain */
-  size_t sent;                     /* Count of messages sent, used as delivery tag */
-} queue_t;
-
-static void queue_init(queue_t *q, const char* name, queue_t *next) {
-  pthread_mutex_init(&q->lock, NULL);
-  strncpy(q->name, name, sizeof(q->name));
-  VEC_INIT(q->messages);
-  VEC_INIT(q->waiting);
-  q->next = next;
-  q->sent = 0;
-}
-
-static void queue_destroy(queue_t *q) {
-  pthread_mutex_destroy(&q->lock);
-  free(q->name);
-  for (size_t i = 0; i < q->messages.len; ++i)
-    free(q->messages.data[i].start);
-  VEC_FINAL(q->messages);
-  for (size_t i = 0; i < q->waiting.len; ++i)
-    pn_decref(q->waiting.data[i]);
-  VEC_FINAL(q->waiting);
-}
-
-/* Send a message on s, or record s as eating if no messages.
-   Called in s dispatch loop, assumes s has credit.
-*/
-static void queue_send(queue_t *q, pn_link_t *s) {
-  pn_rwbytes_t m = { 0 };
-  size_t tag = 0;
-  pthread_mutex_lock(&q->lock);
-  if (q->messages.len == 0) { /* Empty, record connection as waiting */
-    /* Record connection for wake-up if not already on the list. */
-    pn_connection_t *c = pn_session_connection(pn_link_session(s));
-    size_t i = 0;
-    for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
-      ;
-    if (i == q->waiting.len) {
-      VEC_PUSH(q->waiting, c);
-    }
-  } else {
-    m = q->messages.data[0];
-    VEC_POP(q->messages);
-    tag = ++q->sent;
-  }
-  pthread_mutex_unlock(&q->lock);
-  if (m.start) {
-    pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
-    pn_link_send(s, m.start, m.size);
-    pn_link_advance(s);
-    pn_delivery_settle(d);  /* Pre-settled: unreliable, there will bea no ack/ */
-    free(m.start);
-  }
-}
-
-/* Data associated with each broker connection */
-typedef struct broker_data_t {
-  bool check_queues;          /* Check senders on the connection for available data in queues. */
-} broker_data_t;
-
-/* Use the context pointer as a boolean flag to indicate we need to check queues */
-void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
-  pn_connection_set_context(c, (void*)check);
-}
-
-bool pn_connection_get_check_queues(pn_connection_t *c) {
-  return (bool)pn_connection_get_context(c);
-}
-
-/* Put a message on the queue, called in receiver dispatch loop.
-   If the queue was previously empty, notify waiting senders.
-*/
-static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
-  pthread_mutex_lock(&q->lock);
-  VEC_PUSH(q->messages, m);
-  if (q->messages.len == 1) { /* Was empty, notify waiting connections */
-    for (size_t i = 0; i < q->waiting.len; ++i) {
-      pn_connection_t *c = q->waiting.data[i];
-      pn_connection_set_check_queues(c, true);
-      pn_connection_wake(c); /* Wake the connection */
-    }
-    q->waiting.len = 0;
-  }
-  pthread_mutex_unlock(&q->lock);
-}
-
-/* Thread safe set of queues */
-typedef struct queues_t {
-  pthread_mutex_t lock;
-  queue_t *queues;
-  size_t sent;
-} queues_t;
-
-void queues_init(queues_t *qs) {
-  pthread_mutex_init(&qs->lock, NULL);
-  qs->queues = NULL;
-}
-
-void queues_destroy(queues_t *qs) {
-  for (queue_t *q = qs->queues; q; q = q->next) {
-    queue_destroy(q);
-    free(q);
-  }
-  pthread_mutex_destroy(&qs->lock);
-}
-
-/** Get or create the named queue. */
-queue_t* queues_get(queues_t *qs, const char* name) {
-  pthread_mutex_lock(&qs->lock);
-  queue_t *q;
-  for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
-    ;
-  if (!q) {
-    q = (queue_t*)malloc(sizeof(queue_t));
-    queue_init(q, name, qs->queues);
-    qs->queues = q;
-  }
-  pthread_mutex_unlock(&qs->lock);
-  return q;
-}
-
-/* The broker implementation */
-typedef struct broker_t {
-  pn_proactor_t *proactor;
-  size_t threads;
-  const char *container_id;     /* AMQP container-id */
-  queues_t queues;
-  bool finished;
-} broker_t;
-
-void broker_stop(broker_t *b) {
-  /* Interrupt the proactor to stop the working threads. */
-  pn_proactor_interrupt(b->proactor);
-}
-
-/* Try to send if link is sender and has credit */
-static void link_send(broker_t *b, pn_link_t *s) {
-  if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
-    const char *qname = pn_terminus_get_address(pn_link_source(s));
-    queue_t *q = queues_get(&b->queues, qname);
-    queue_send(q, s);
-  }
-}
-
-static void queue_unsub(queue_t *q, pn_connection_t *c) {
-  pthread_mutex_lock(&q->lock);
-  for (size_t i = 0; i < q->waiting.len; ++i) {
-    if (q->waiting.data[i] == c){
-      q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
-      VEC_POP(q->waiting);
-      break;
-    }
-  }
-  pthread_mutex_unlock(&q->lock);
-}
-
-/* Unsubscribe from the queue of interest to this link. */
-static void link_unsub(broker_t *b, pn_link_t *s) {
-  if (pn_link_is_sender(s)) {
-    const char *qname = pn_terminus_get_address(pn_link_source(s));
-    if (qname) {
-      queue_t *q = queues_get(&b->queues, qname);
-      queue_unsub(q, pn_session_connection(pn_link_session(s)));
-    }
-  }
-}
-
-/* Called in connection's event loop when a connection is woken for messages.*/
-static void connection_unsub(broker_t *b, pn_connection_t *c) {
-  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
-    link_unsub(b, l);
-}
-
-static void session_unsub(broker_t *b, pn_session_t *ssn) {
-  pn_connection_t *c = pn_session_connection(ssn);
-  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
-    if (pn_link_session(l) == ssn)
-      link_unsub(b, l);
-  }
-}
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-    exit_code = 1;              /* Remeber there was an unexpected error */
-  }
-}
-
-const int WINDOW=10;            /* Incoming credit window */
-
-static void handle(broker_t* b, pn_event_t* e) {
-  pn_connection_t *c = pn_event_connection(e);
-
-  switch (pn_event_type(e)) {
-
-   case PN_LISTENER_OPEN:
-    printf("listening\n");
-    fflush(stdout);
-    break;
-
-   case PN_LISTENER_ACCEPT:
-    pn_listener_accept(pn_event_listener(e), pn_connection());
-    break;
-
-   case PN_CONNECTION_INIT:
-     pn_connection_set_container(c, b->container_id);
-     break;
-
-   case PN_CONNECTION_BOUND: {
-     /* Turn off security */
-     pn_transport_t *t = pn_connection_transport(c);
-     pn_transport_require_auth(t, false);
-     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
-     break;
-   }
-   case PN_CONNECTION_REMOTE_OPEN: {
-     pn_connection_open(pn_event_connection(e)); /* Complete the open */
-     break;
-   }
-   case PN_CONNECTION_WAKE: {
-     if (pn_connection_get_check_queues(c)) {
-       pn_connection_set_check_queues(c, false);
-       int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
-       for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
-         link_send(b, l);
-     }
-     break;
-   }
-   case PN_SESSION_REMOTE_OPEN: {
-     pn_session_open(pn_event_session(e));
-     break;
-   }
-   case PN_LINK_REMOTE_OPEN: {
-     pn_link_t *l = pn_event_link(e);
-     if (pn_link_is_sender(l)) {
-       const char *source = pn_terminus_get_address(pn_link_remote_source(l));
-       pn_terminus_set_address(pn_link_source(l), source);
-     } else {
-       const char* target = pn_terminus_get_address(pn_link_remote_target(l));
-       pn_terminus_set_address(pn_link_target(l), target);
-       pn_link_flow(l, WINDOW);
-     }
-     pn_link_open(l);
-     break;
-   }
-   case PN_LINK_FLOW: {
-     link_send(b, pn_event_link(e));
-     break;
-   }
-   case PN_DELIVERY: {
-     pn_delivery_t *d = pn_event_delivery(e);
-     pn_link_t *r = pn_delivery_link(d);
-     if (pn_link_is_receiver(r) &&
-         pn_delivery_readable(d) && !pn_delivery_partial(d))
-     {
-       size_t size = pn_delivery_pending(d);
-       /* The broker does not decode the message, just forwards it. */
-       pn_rwbytes_t m = { size, (char*)malloc(size) };
-       pn_link_recv(r, m.start, m.size);
-       const char *qname = pn_terminus_get_address(pn_link_target(r));
-       queue_receive(b->proactor, queues_get(&b->queues, qname), m);
-       pn_delivery_update(d, PN_ACCEPTED);
-       pn_delivery_settle(d);
-       pn_link_flow(r, WINDOW - pn_link_credit(r));
-     }
-     break;
-   }
-
-   case PN_TRANSPORT_CLOSED:
-    check_condition(e, pn_transport_condition(pn_event_transport(e)));
-    connection_unsub(b, pn_event_connection(e));
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
-    pn_connection_close(pn_event_connection(e));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(e, pn_session_remote_condition(pn_event_session(e)));
-    session_unsub(b, pn_event_session(e));
-    pn_session_close(pn_event_session(e));
-    pn_session_free(pn_event_session(e));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-    check_condition(e, pn_link_remote_condition(pn_event_link(e)));
-    link_unsub(b, pn_event_link(e));
-    pn_link_close(pn_event_link(e));
-    pn_link_free(pn_event_link(e));
-    break;
-
-   case PN_LISTENER_CLOSE:
-    check_condition(e, pn_listener_condition(pn_event_listener(e)));
-    broker_stop(b);
-    break;
-
- break;
-
-   case PN_PROACTOR_INACTIVE:   /* listener and all connections closed */
-    broker_stop(b);
-    break;
-
-   case PN_PROACTOR_INTERRUPT:
-    b->finished = true;
-    pn_proactor_interrupt(b->proactor); /* Pass along the interrupt to the other threads */
-    break;
-
-   default:
-    break;
-  }
-}
-
-static void* broker_thread(void *void_broker) {
-  broker_t *b = (broker_t*)void_broker;
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(b->proactor);
-    pn_event_t *e;
-    while ((e = pn_event_batch_next(events))) {
-      handle(b, e);
-    }
-    pn_proactor_done(b->proactor, events);
-  } while(!b->finished);
-  return NULL;
-}
-
-int main(int argc, char **argv) {
-  broker_t b = {0};
-  b.proactor = pn_proactor();
-  queues_init(&b.queues);
-  b.container_id = argv[0];
-  b.threads = 4;
-  int i = 1;
-  const char *host = (argc > i) ? argv[i++] : "";
-  const char *port = (argc > i) ? argv[i++] : "amqp";
-
-  /* Listen on addr */
-  char addr[PN_MAX_ADDR];
-  pn_proactor_addr(addr, sizeof(addr), host, port);
-  pn_proactor_listen(b.proactor, pn_listener(), addr, 16);
-
-  /* Start n-1 threads */
-  pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
-  for (size_t i = 0; i < b.threads-1; ++i) {
-    pthread_create(&threads[i], NULL, broker_thread, &b);
-  }
-  broker_thread(&b);            /* Use the main thread too. */
-  /* Join the other threads */
-  for (size_t i = 0; i < b.threads-1; ++i) {
-    pthread_join(threads[i], NULL);
-  }
-  pn_proactor_free(b.proactor);
-  free(threads);
-  return exit_code;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
deleted file mode 100644
index 15550e6..0000000
--- a/examples/c/proactor/direct.c
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/condition.h>
-#include <proton/connection.h>
-#include <proton/delivery.h>
-#include <proton/link.h>
-#include <proton/listener.h>
-#include <proton/message.h>
-#include <proton/proactor.h>
-#include <proton/sasl.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-typedef struct app_data_t {
-  const char *host, *port;
-  const char *amqp_address;
-  const char *container_id;
-  int message_count;
-
-  pn_proactor_t *proactor;
-  pn_listener_t *listener;
-  pn_rwbytes_t message_buffer;
-
-  /* Sender values */
-  int sent;
-  int acknowledged;
-  pn_link_t *sender;
-
-  /* Receiver values */
-  int received;
-} app_data_t;
-
-static const int BATCH = 1000; /* Batch size for unlimited receive */
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-    pn_connection_close(pn_event_connection(e));
-    exit_code = 1;
-  }
-}
-
-/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
-static pn_bytes_t encode_message(app_data_t* app) {
-  /* Construct a message with the map { "sequence": app.sent } */
-  pn_message_t* message = pn_message();
-  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
-  pn_data_t* body = pn_message_body(message);
-  pn_data_put_map(body);
-  pn_data_enter(body);
-  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
-  pn_data_put_int(body, app->sent); /* The sequence number */
-  pn_data_exit(body);
-
-  /* encode the message, expanding the encode buffer as needed */
-  if (app->message_buffer.start == NULL) {
-    static const size_t initial_size = 128;
-    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
-  }
-  /* app->message_buffer is the total buffer space available. */
-  /* mbuf wil point at just the portion used by the encoded message */
-  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
-  int status = 0;
-  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
-    app->message_buffer.size *= 2;
-    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
-    mbuf.size = app->message_buffer.size;
-  }
-  if (status != 0) {
-    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
-    exit(1);
-  }
-  pn_message_free(message);
-  return pn_bytes(mbuf.size, mbuf.start);
-}
-
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
-  static char buffer[MAX_SIZE];
-  ssize_t len;
-  // try to decode the message body
-  if (pn_delivery_pending(dlv) < MAX_SIZE) {
-    // read in the raw data
-    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
-    if (len > 0) {
-      // decode it into a proton message
-      pn_message_t *m = pn_message();
-      if (PN_OK == pn_message_decode(m, buffer, len)) {
-        pn_string_t *s = pn_string(NULL);
-        pn_inspect(pn_message_body(m), s);
-        printf("%s\n", pn_string_get(s));
-        pn_free(s);
-      }
-      pn_message_free(m);
-    }
-  }
-}
-
-/* This function handles events when we are acting as the receiver */
-static void handle_receive(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_LINK_REMOTE_OPEN: {
-     pn_link_t *l = pn_event_link(event);
-     pn_link_open(l);
-     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
-   } break;
-
-   case PN_DELIVERY: {
-     /* A message has been received */
-     pn_link_t *link = NULL;
-     pn_delivery_t *dlv = pn_event_delivery(event);
-     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
-       link = pn_delivery_link(dlv);
-       decode_message(dlv);
-       /* Accept the delivery */
-       pn_delivery_update(dlv, PN_ACCEPTED);
-       /* done with the delivery, move to the next and free it */
-       pn_link_advance(link);
-       pn_delivery_settle(dlv);  /* dlv is now freed */
-
-       if (app->message_count == 0) {
-         /* receive forever - see if more credit is needed */
-         if (pn_link_credit(link) < BATCH/2) {
-           /* Grant enough credit to bring it up to BATCH: */
-           pn_link_flow(link, BATCH - pn_link_credit(link));
-         }
-       } else if (++app->received >= app->message_count) {
-         /* done receiving, close the endpoints */
-         printf("%d messages received\n", app->received);
-         pn_session_t *ssn = pn_link_session(link);
-         pn_link_close(link);
-         pn_session_close(ssn);
-         pn_connection_close(pn_session_connection(ssn));
-       }
-     }
-   } break;
-
-   default:
-    break;
-  }
-}
-
-/* This function handles events when we are acting as the sender */
-static void handle_send(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_LINK_REMOTE_OPEN: {
-     pn_link_t* l = pn_event_link(event);
-     pn_terminus_set_address(pn_link_target(l), app->amqp_address);
-     pn_link_open(l);
-   } break;
-
-   case PN_LINK_FLOW: {
-     /* The peer has given us some credit, now we can send messages */
-     pn_link_t *sender = pn_event_link(event);
-     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
-       ++app->sent;
-       // Use sent counter as unique delivery tag.
-       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
-       pn_bytes_t msgbuf = encode_message(app);
-       pn_link_send(sender, msgbuf.start, msgbuf.size);
-       pn_link_advance(sender);
-     }
-     break;
-   }
-
-   case PN_DELIVERY: {
-     /* We received acknowledgedment from the peer that a message was delivered. */
-     pn_delivery_t* d = pn_event_delivery(event);
-     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
-       if (++app->acknowledged == app->message_count) {
-         printf("%d messages sent and acknowledged\n", app->acknowledged);
-         pn_connection_close(pn_event_connection(event));
-         /* Continue handling events till we receive TRANSPORT_CLOSED */
-       }
-     }
-   } break;
-
-   default:
-    break;
-  }
-}
-
-/* Handle all events, delegate to handle_send or handle_receive depending on link mode.
-   Return true to continue, false to exit
-*/
-static bool handle(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_LISTENER_OPEN:
-    printf("listening\n");
-    fflush(stdout);
-    break;
-
-   case PN_LISTENER_ACCEPT:
-    pn_listener_accept(pn_event_listener(event), pn_connection());
-    break;
-
-   case PN_CONNECTION_INIT:
-    pn_connection_set_container(pn_event_connection(event), app->container_id);
-    break;
-
-   case PN_CONNECTION_BOUND: {
-     /* Turn off security */
-     pn_transport_t *t = pn_event_transport(event);
-     pn_transport_require_auth(t, false);
-     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
-   }
-   case PN_CONNECTION_REMOTE_OPEN: {
-     pn_connection_open(pn_event_connection(event)); /* Complete the open */
-     break;
-   }
-
-   case PN_SESSION_REMOTE_OPEN: {
-     pn_session_open(pn_event_session(event));
-     break;
-   }
-
-   case PN_TRANSPORT_CLOSED:
-    check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    pn_listener_close(app->listener); /* Finished */
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-   case PN_LINK_REMOTE_DETACH:
-    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_PROACTOR_TIMEOUT:
-    /* Wake the sender's connection */
-    pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
-    break;
-
-   case PN_LISTENER_CLOSE:
-    check_condition(event, pn_listener_condition(pn_event_listener(event)));
-    break;
-
-   case PN_PROACTOR_INACTIVE:
-    return false;
-    break;
-
-   default: {
-     pn_link_t *l = pn_event_link(event);
-     if (l) {                      /* Only delegate link-related events */
-       if (pn_link_is_sender(l)) {
-         handle_send(app, event);
-       } else {
-         handle_receive(app, event);
-       }
-     }
-   }
-  }
-  return true;
-}
-
-void run(app_data_t *app) {
-  /* Loop and handle events */
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
-    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
-      if (!handle(app, e)) {
-        return;
-      }
-    }
-    pn_proactor_done(app->proactor, events);
-  } while(true);
-}
-
-int main(int argc, char **argv) {
-  struct app_data_t app = {0};
-  int i = 0;
-  app.container_id = argv[i++];   /* Should be unique */
-  app.host = (argc > 1) ? argv[i++] : "";
-  app.port = (argc > 1) ? argv[i++] : "amqp";
-  app.amqp_address = (argc > i) ? argv[i++] : "examples";
-  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
-
-  /* Create the proactor and connect */
-  app.proactor = pn_proactor();
-  app.listener = pn_listener();
-  char addr[PN_MAX_ADDR];
-  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
-  pn_proactor_listen(app.proactor, app.listener, addr, 16);
-  run(&app);
-  pn_proactor_free(app.proactor);
-  free(app.message_buffer.start);
-  return exit_code;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/example_test.py b/examples/c/proactor/example_test.py
deleted file mode 100644
index 02bb1fd..0000000
--- a/examples/c/proactor/example_test.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License
-#
-
-# This is a test script to run the examples and verify that they behave as expected.
-
-import unittest, sys, time
-from proctest import *
-
-def python_cmd(name):
-    dir = os.path.dirname(__file__)
-    return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
-
-def receive_expect(n):
-    return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
-
-class Broker(object):
-    def __init__(self, test):
-        self.test = test
-
-    def __enter__(self):
-        with TestPort() as tp:
-            self.port = tp.port
-            self.host = tp.host
-            self.addr = tp.addr
-            self.proc = self.test.proc(["broker", "", self.port])
-            self.proc.wait_re("listening")
-            return self
-
-    def __exit__(self, *args):
-        b = getattr(self, "proc")
-        if b:
-            if b.poll() !=  None: # Broker crashed
-                raise ProcError(b, "broker crash")
-            b.kill()
-
-class CExampleTest(ProcTestCase):
-
-    def test_send_receive(self):
-        """Send first then receive"""
-        with Broker(self) as b:
-            s = self.proc(["send", "", b.port])
-            self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
-            r = self.proc(["receive", "", b.port])
-            self.assertEqual(receive_expect(10), r.wait_exit())
-
-    def test_receive_send(self):
-        """Start receiving  first, then send."""
-        with Broker(self) as b:
-            r = self.proc(["receive", "", b.port]);
-            s = self.proc(["send", "", b.port]);
-            self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
-            self.assertEqual(receive_expect(10), r.wait_exit())
-
-    def test_send_direct(self):
-        """Send to direct server"""
-        with TestPort() as tp:
-            d = self.proc(["direct", "", tp.port])
-            d.wait_re("listening")
-            self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", "", tp.port]).wait_exit())
-            self.assertIn(receive_expect(10), d.wait_exit())
-
-    def test_receive_direct(self):
-        """Receive from direct server"""
-        with TestPort() as tp:
-            d = self.proc(["direct", "", tp.port])
-            d.wait_re("listening")
-            self.assertEqual(receive_expect(10), self.proc(["receive", "", tp.port]).wait_exit())
-            self.assertIn("10 messages sent and acknowledged\n", d.wait_exit())
-
-
-if __name__ == "__main__":
-    unittest.main()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org