You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/25 21:01:26 UTC

[30/48] qpid-proton git commit: PROTON-1344: C proactor for multi-threaded proton applications

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/engine/c/psend.c
----------------------------------------------------------------------
diff --git a/examples/engine/c/psend.c b/examples/engine/c/psend.c
deleted file mode 100644
index 2ad0edb..0000000
--- a/examples/engine/c/psend.c
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-/*################################################################
-  This program is half of a pair.  Precv and Psend are meant to 
-  be simple-as-possible examples of how to use the proton-c
-  engine interface to send and receive messages over a single 
-  connection and a single session.
-
-  In addition to being examples, these programs or their 
-  descendants will be used in performance regression testing
-  for both throughput and latency, and long-term soak testing.
-
-  This program, psend, is highly similar to its peer precv.
-  I put all the good comments in precv.
-*################################################################*/
-
-
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <ctype.h>
-#include <time.h>
-#include <sys/time.h>
-#define __STDC_FORMAT_MACROS
-#include <inttypes.h>
-
-
-#include <proton/connection.h>
-#include <proton/delivery.h>
-#include <proton/driver.h>
-#include <proton/event.h>
-#include <proton/terminus.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/session.h>
-
-
-#define MY_BUF_SIZE 1000
-
-
-
-void
-print_timestamp ( FILE * fp, char const * label )
-{
-  struct timeval tv;
-  struct tm      * timeinfo;
-
-  gettimeofday ( & tv, 0 );
-  timeinfo = localtime ( & tv.tv_sec );
-
-  int seconds_today = 3600 * timeinfo->tm_hour +
-                        60 * timeinfo->tm_min  +
-                             timeinfo->tm_sec;
-
-  fprintf ( fp, "time : %d.%.6ld : %s\n", seconds_today, tv.tv_usec, label );
-}
-
-
-
-
-
-static 
-double
-get_time ( )
-{
-  struct timeval tv;
-  struct tm      * timeinfo;
-
-  gettimeofday ( & tv, 0 );
-  timeinfo = localtime ( & tv.tv_sec );
-
-  double time_now = 3600 * timeinfo->tm_hour +
-                      60 * timeinfo->tm_min  +
-                           timeinfo->tm_sec;
-
-  time_now += ((double)(tv.tv_usec) / 1000000.0);
-  return time_now;
-}
-
-
-
-
-
-int 
-main ( int argc, char ** argv )
-{
-  char addr [ 1000 ];
-  char host [ 1000 ];
-  char port [ 1000 ];
-  char output_file_name[1000];
-
-
-  uint64_t messages       = 2000000,
-           delivery_count = 0;
-
-  int message_length = 100;
-
-  bool done           = false;
-  int  sent_count     = 0;
-  int  n_links        = 5;
-  int const max_links = 100;
-
-  strcpy ( addr, "queue" );
-  strcpy ( host, "0.0.0.0" );
-  strcpy ( port, "5672" );
-
-  FILE * output_fp;
-
-
-  for ( int i = 1; i < argc; ++ i )
-  {
-    if ( ! strcmp ( "--host", argv[i] ) )
-    {
-      strcpy ( host, argv[i+1] );
-      ++ i;
-    }
-    else
-    if ( ! strcmp ( "--port", argv[i] ) )
-    {
-      strcpy ( port, argv[i+1] );
-      ++ i;
-    }
-    else
-    if ( ! strcmp ( "--messages", argv[i] ) )
-    {
-      sscanf ( argv [ i+1 ], "%" SCNd64 , & messages );
-      ++ i;
-    }
-    else
-    if ( ! strcmp ( "--message_length", argv[i] ) )
-    {
-      sscanf ( argv [ i+1 ], "%d", & message_length );
-      ++ i;
-    }
-    else
-    if ( ! strcmp ( "--n_links", argv[i] ) )
-    {
-      sscanf ( argv [ i+1 ], "%d", & n_links );
-      ++ i;
-    }
-    if ( ! strcmp ( "--output", argv[i] ) )
-    {
-      if ( ! strcmp ( "stderr", argv[i+1] ) )
-      {
-        output_fp = stderr;
-        strcpy ( output_file_name, "stderr");
-      }
-      else
-      if ( ! strcmp ( "stdout", argv[i+1] ) )
-      {
-        output_fp = stdout;
-        strcpy ( output_file_name, "stdout");
-      }
-      else
-      {
-        output_fp = fopen ( argv[i+1], "w" );
-        strcpy ( output_file_name, argv[i+1] );
-        if ( ! output_fp )
-        {
-          fprintf ( stderr, "Can't open |%s| for writing.\n", argv[i+1] );
-          exit ( 1 );
-        }
-      }
-      ++ i;
-    }
-    else
-    {
-      fprintf ( output_fp, "unknown arg %s", argv[i] );
-    }
-  }
-
-
-  fprintf ( output_fp, "host            %s\n",          host );
-  fprintf ( output_fp, "port            %s\n",          port );
-  fprintf ( output_fp, "messages        %" PRId64 "\n", messages );
-  fprintf ( output_fp, "message_length  %d\n",          message_length );
-  fprintf ( output_fp, "n_links         %d\n",          n_links );
-  fprintf ( output_fp, "output          %s\n",          output_file_name );
-
-
-  if ( n_links > max_links )
-  {
-    fprintf ( output_fp, "You can't have more than %d links.\n", max_links );
-    exit ( 1 );
-  }
-
-  pn_driver_t     * driver;
-  pn_connector_t  * connector;
-  pn_connector_t  * driver_connector;
-  pn_connection_t * connection;
-  pn_collector_t  * collector;
-  pn_link_t       * links [ max_links ];
-  pn_session_t    * session;
-  pn_event_t      * event;
-  pn_delivery_t   * delivery;
-
-
-  char * message = (char *) malloc(message_length);
-  memset ( message, 13, message_length );
-
-  /*----------------------------------------------------
-    Get everything set up.
-    We will have a single connector, a single 
-    connection, a single session, and a single link.
-  ----------------------------------------------------*/
-  driver = pn_driver ( );
-  connector = pn_connector ( driver, host, port, 0 );
-
-  connection = pn_connection();
-  collector  = pn_collector  ( );
-  pn_connection_collect ( connection, collector );
-  pn_connector_set_connection ( connector, connection );
-
-  session = pn_session ( connection );
-  pn_connection_open ( connection );
-  pn_session_open ( session );
-
-  for ( int i = 0; i < n_links; ++ i )
-  {
-    char name[100];
-    sprintf ( name, "tvc_15_%d", i );
-    links[i] = pn_sender ( session, name );
-    pn_terminus_set_address ( pn_link_target(links[i]), addr );
-    pn_link_open ( links[i] );
-  }
-
-  /*-----------------------------------------------------------
-    For my speed tests, I do not want to count setup time.
-    Start timing here.  The receiver will print out a similar
-    timestamp when he receives the final message.
-  -----------------------------------------------------------*/
-  fprintf ( output_fp, "psend: sending %llu messages.\n", messages );
-
-  // Just before we start sending, print the start timestamp.
-  fprintf ( output_fp, "psend_start %.3lf\n", get_time() );
-
-  while ( 1 )
-  {
-    pn_driver_wait ( driver, -1 );
-
-    int event_count = 1;
-    while ( event_count > 0 )
-    {
-      event_count = 0;
-      pn_connector_process ( connector );
-
-      event = pn_collector_peek(collector);
-      while ( event )
-      {
-        ++ event_count;
-        pn_event_type_t event_type = pn_event_type ( event );
-        //fprintf ( output_fp, "event: %s\n", pn_event_type_name ( event_type ) );
-
-        switch ( event_type )
-        {
-          case PN_LINK_FLOW:
-          {
-            if ( delivery_count < messages )
-            {
-              /*---------------------------------------------------
-                We may have opened multiple links.
-                The event will tell us which one this flow-event
-                happened on.  If the flow event gave us some 
-                credit, we will greedily send messages until it
-                is all used up.
-              ---------------------------------------------------*/
-              pn_link_t * link = pn_event_link ( event );
-              int credit = pn_link_credit ( link );
-
-              while ( credit > 0 )
-              {
-                // Every delivery we create needs a unique tag.
-                char str [ 100 ];
-                sprintf ( str, "%x", delivery_count ++ );
-                delivery = pn_delivery ( link, pn_dtag(str, strlen(str)) );
-
-                // If you settle the delivery before sending it,
-                // you will spend some time wondering why your 
-                // messages don't have any content when they arrive 
-                // at the receiver.
-                pn_link_send ( link, message, message_length );
-                pn_delivery_settle ( delivery );
-                pn_link_advance ( link );
-                credit = pn_link_credit ( link );
-              }
-              
-              if ( delivery_count >= messages )
-              {
-                fprintf ( output_fp, 
-                          "I have sent all %d messages.\n" ,
-                          delivery_count
-                        );
-                /*
-                  I'm still kind of hazy on how to shut down the
-                  psend / precv system properly ....
-                  I can't go to all_done here, or precv will never
-                  get all its messages and terminate.
-                  So I let precv terminate properly ... which means
-                  that this program, psend, dies with an error.
-                  Hmm.
-                */
-                // goto all_done;
-              }
-            }
-          }
-          break;
-
-
-          case PN_TRANSPORT:
-            // I don't know what this means here, either.
-          break;
-
-
-          case PN_TRANSPORT_TAIL_CLOSED:
-            goto all_done;
-          break;
-
-
-          default:
-            /*
-            fprintf ( output_fp,
-                      "precv unhandled event: %s\n",
-                      pn_event_type_name(event_type)
-                    );
-            */
-          break;
-
-        }
-
-        pn_collector_pop ( collector );
-        event = pn_collector_peek(collector);
-      }
-    }
-  }
-
-  all_done:
-
-  for ( int i = 0; i < n_links; ++ i )
-  {
-    pn_link_close ( links[i] );
-  }
-
-  pn_session_close ( session );
-  pn_connection_close ( connection );
-
-  return 0;
-}
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
new file mode 100644
index 0000000..d40b9cb
--- /dev/null
+++ b/examples/exampletest.py
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# A test library to make it easy to run unittest tests that start,
+# monitor, and report output from sub-processes. In particular
+# it helps with starting processes that listen on random ports.
+
+import unittest
+import os, sys, socket, time, re, inspect, errno, threading
+from  random import randrange
+from subprocess import Popen, PIPE, STDOUT
+from copy import copy
+import platform
+from os.path import dirname as dirname
+
+def pick_port():
+    """Pick a random port."""
+    p =  randrange(10000, 20000)
+    return p
+
+class ProcError(Exception):
+    """An exception that captures failed process output"""
+    def __init__(self, proc, what="bad exit status"):
+        out = proc.out.strip()
+        if out:
+            out = "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^\n" % out
+        else:
+            out = ", no output)"
+        super(Exception, self, ).__init__(
+            "%s %s, code=%s%s" % (proc.args, what, proc.returncode, out))
+
+class NotFoundError(ProcError):
+    pass
+
+class Proc(Popen):
+    """A example process that stores its output, optionally run with valgrind."""
+
+    if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
+        env_args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", "--leak-check=full"]
+    else:
+        env_args = []
+
+    @property
+    def out(self):
+        self._out.seek(0)
+        return self._out.read()
+
+    def __init__(self, args, **kwargs):
+        """Start an example process"""
+        args = list(args)
+        self.args = args
+        self._out = os.tmpfile()
+        try:
+            Popen.__init__(self, self.env_args + self.args, stdout=self._out, stderr=STDOUT, **kwargs)
+        except OSError, e:
+            if e.errno == errno.ENOENT:
+                raise NotFoundError(self, str(e))
+            raise ProcError(self, str(e))
+        except Exception, e:
+            raise ProcError(self, str(e))
+
+    def kill(self):
+        try:
+            if self.poll() is None:
+                Popen.kill(self)
+        except:
+            pass                # Already exited.
+        return self.out
+
+    def wait_out(self, timeout=10, expect=0):
+        """Wait for process to exit, return output. Raise ProcError  on failure."""
+        t = threading.Thread(target=self.wait)
+        t.start()
+        t.join(timeout)
+        if self.poll() is None:      # Still running
+            self.kill()
+            raise ProcError(self, "timeout")
+        if expect is not None and self.poll() != expect:
+            raise ProcError(self)
+        return self.out
+
+# Work-around older python unittest that lacks setUpClass.
+if hasattr(unittest.TestCase, 'setUpClass') and  hasattr(unittest.TestCase, 'tearDownClass'):
+    TestCase = unittest.TestCase
+else:
+    class TestCase(unittest.TestCase):
+        """
+        Roughly provides setUpClass and tearDownClass functionality for older python
+        versions in our test scenarios. If subclasses override setUp or tearDown
+        they *must* call the superclass.
+        """
+        def setUp(self):
+            if not hasattr(type(self), '_setup_class_count'):
+                type(self)._setup_class_count = len(
+                    inspect.getmembers(
+                        type(self),
+                        predicate=lambda(m): inspect.ismethod(m) and m.__name__.startswith('test_')))
+                type(self).setUpClass()
+
+        def tearDown(self):
+            self.assertTrue(self._setup_class_count > 0)
+            self._setup_class_count -=  1
+            if self._setup_class_count == 0:
+                type(self).tearDownClass()
+
+class ExampleTestCase(TestCase):
+    """TestCase that manages started processes"""
+    def setUp(self):
+        super(ExampleTestCase, self).setUp()
+        self.procs = []
+
+    def tearDown(self):
+        for p in self.procs:
+            p.kill()
+        super(ExampleTestCase, self).tearDown()
+
+    def proc(self, *args, **kwargs):
+        p = Proc(*args, **kwargs)
+        self.procs.append(p)
+        return p
+
+def wait_port(port, timeout=10):
+    """Wait up to timeout for port to be connectable."""
+    if timeout:
+        deadline = time.time() + timeout
+    while (timeout is None or time.time() < deadline):
+        try:
+            s = socket.create_connection((None, port), timeout) # Works for IPv6 and v4
+            s.close()
+            return
+        except socket.error, e:
+            if e.errno != errno.ECONNREFUSED: # Only retry on connection refused error.
+                raise
+    raise socket.timeout()
+
+
+class BrokerTestCase(ExampleTestCase):
+    """
+    ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
+    Subclass must set `broker_exe` class variable with the name of the broker executable.
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        cls.port = pick_port()
+        cls.addr = "127.0.0.1:%s/examples" % (cls.port)
+        cls.broker = None       # In case Proc throws, create the attribute.
+        cls.broker = Proc(cls.broker_exe + ["-a", cls.addr])
+        try:
+            wait_port(cls.port)
+        except Exception, e:
+            cls.broker.kill()
+            raise ProcError(cls.broker, "timed out waiting for port")
+
+    @classmethod
+    def tearDownClass(cls):
+        if cls.broker: cls.broker.kill()
+
+    def tearDown(self):
+        b = type(self).broker
+        if b and b.poll() !=  None: # Broker crashed
+            type(self).setUpClass() # Start another for the next test.
+            raise ProcError(b, "broker crash")
+        super(BrokerTestCase, self).tearDown()
+
+if __name__ == "__main__":
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
index 70e6754..d9825c2 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
@@ -121,8 +121,8 @@ PN_CPP_CLASS_EXTERN connection_engine {
 
     /// Configure a connection by applying exactly the options in opts (including proton::messaging_handler)
     /// Does not apply any default options, to apply container defaults use connect() or accept()
-    /// instead.
-    void configure(const connection_options& opts=connection_options());
+    /// instead. If server==true, configure a server connection.
+    void configure(const connection_options& opts=connection_options(), bool server=false);
 
     /// Call configure() with client options and call connection::open()
     /// Options applied: container::id(), container::client_connection_options(), opts.
@@ -200,12 +200,13 @@ PN_CPP_CLASS_EXTERN connection_engine {
     PN_CPP_EXTERN proton::container* container() const;
 
  private:
+    void init();
     connection_engine(const connection_engine&);
     connection_engine& operator=(const connection_engine&);
 
     messaging_handler* handler_;
     proton::container* container_;
-    pn_connection_engine_t c_engine_;
+    pn_connection_engine_t engine_;
 };
 
 } // io

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/bindings/cpp/include/proton/sender_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sender_options.hpp b/proton-c/bindings/cpp/include/proton/sender_options.hpp
index 64fea2f..0a618ff 100644
--- a/proton-c/bindings/cpp/include/proton/sender_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/sender_options.hpp
@@ -90,10 +90,10 @@ class sender_options {
     PN_CPP_EXTERN sender_options& auto_settle(bool);
 
     /// Options for the source node of the sender.
-    PN_CPP_EXTERN sender_options& source(source_options &);
+    PN_CPP_EXTERN sender_options& source(const source_options &);
 
     /// Options for the receiver node of the receiver.
-    PN_CPP_EXTERN sender_options& target(target_options &);
+    PN_CPP_EXTERN sender_options& target(const target_options &);
 
     /// @cond INTERNAL
   private:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
index 4712b3e..5e6483f 100644
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -40,35 +40,34 @@
 namespace proton {
 namespace io {
 
-connection_engine::connection_engine() :
-    container_(0)
-{
-    int err;
-    if ((err = pn_connection_engine_init(&c_engine_)))
-        throw proton::error(std::string("connection_engine init failed: ")+pn_code(err));
-}
-
-connection_engine::connection_engine(class container& cont, event_loop* loop) :
-    container_(&cont)
-{
-    int err;
-    if ((err = pn_connection_engine_init(&c_engine_)))
-        throw proton::error(std::string("connection_engine init failed: ")+pn_code(err));
+void connection_engine::init() {
+    if (pn_connection_engine_init(&engine_, pn_connection(), pn_transport()) != 0) {
+        this->~connection_engine(); // Dtor won't be called on throw from ctor.
+        throw proton::error(std::string("connection_engine allocation failed"));
+    }
+}
+
+connection_engine::connection_engine() : handler_(0), container_(0) { init(); }
+
+connection_engine::connection_engine(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
+    init();
     connection_context& ctx = connection_context::get(connection());
     ctx.container = container_;
     ctx.event_loop.reset(loop);
 }
 
-void connection_engine::configure(const connection_options& opts) {
-    proton::connection c = connection();
+connection_engine::~connection_engine() {
+    pn_connection_engine_destroy(&engine_);
+}
+
+void connection_engine::configure(const connection_options& opts, bool server) {
+    proton::connection c(connection());
     opts.apply_unbound(c);
+    if (server) pn_transport_set_server(engine_.transport);
+    pn_connection_engine_bind(&engine_);
     opts.apply_bound(c);
     handler_ =  opts.handler();
-    connection_context::get(connection()).collector = c_engine_.collector;
-}
-
-connection_engine::~connection_engine() {
-    pn_connection_engine_final(&c_engine_);
+    connection_context::get(connection()).collector = engine_.collector;
 }
 
 void connection_engine::connect(const connection_options& opts) {
@@ -78,7 +77,7 @@ void connection_engine::connect(const connection_options& opts) {
         all.update(container_->client_connection_options());
     }
     all.update(opts);
-    configure(all);
+    configure(all, false);
     connection().open();
 }
 
@@ -89,12 +88,12 @@ void connection_engine::accept(const connection_options& opts) {
         all.update(container_->server_connection_options());
     }
     all.update(opts);
-    configure(all);
+    configure(all, true);
 }
 
 bool connection_engine::dispatch() {
     pn_event_t* c_event;
-    while ((c_event = pn_connection_engine_dispatch(&c_engine_)) != NULL) {
+    while ((c_event = pn_connection_engine_event(&engine_)) != NULL) {
         proton_event cpp_event(c_event, container_);
         try {
             if (handler_ != 0) {
@@ -102,51 +101,56 @@ bool connection_engine::dispatch() {
                 cpp_event.dispatch(adapter);
             }
         } catch (const std::exception& e) {
-            disconnected(error_condition("exception", e.what()));
+            pn_condition_t *cond = pn_transport_condition(engine_.transport);
+            if (!pn_condition_is_set(cond)) {
+                pn_condition_format(cond, "exception", "%s", e.what());
+            }
         }
+        pn_connection_engine_pop_event(&engine_);
     }
-    return !pn_connection_engine_finished(&c_engine_);
+    return !pn_connection_engine_finished(&engine_);
 }
 
 mutable_buffer connection_engine::read_buffer() {
-    pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&c_engine_);
+    pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&engine_);
     return mutable_buffer(buffer.start, buffer.size);
 }
 
 void connection_engine::read_done(size_t n) {
-    return pn_connection_engine_read_done(&c_engine_, n);
+    return pn_connection_engine_read_done(&engine_, n);
 }
 
 void connection_engine::read_close() {
-    pn_connection_engine_read_close(&c_engine_);
+    pn_connection_engine_read_close(&engine_);
 }
 
 const_buffer connection_engine::write_buffer() {
-    pn_bytes_t buffer = pn_connection_engine_write_buffer(&c_engine_);
+    pn_bytes_t buffer = pn_connection_engine_write_buffer(&engine_);
     return const_buffer(buffer.start, buffer.size);
 }
 
 void connection_engine::write_done(size_t n) {
-    return pn_connection_engine_write_done(&c_engine_, n);
+    return pn_connection_engine_write_done(&engine_, n);
 }
 
 void connection_engine::write_close() {
-    pn_connection_engine_write_close(&c_engine_);
+    pn_connection_engine_write_close(&engine_);
 }
 
 void connection_engine::disconnected(const proton::error_condition& err) {
-    pn_condition_t* condition = pn_connection_engine_condition(&c_engine_);
-    if (!pn_condition_is_set(condition))     // Don't overwrite existing condition
+    pn_condition_t* condition = pn_transport_condition(engine_.transport);
+    if (!pn_condition_is_set(condition))  {
         set_error_condition(err, condition);
-    pn_connection_engine_disconnected(&c_engine_);
+    }
+    pn_connection_engine_close(&engine_);
 }
 
 proton::connection connection_engine::connection() const {
-    return make_wrapper(c_engine_.connection);
+    return make_wrapper(engine_.connection);
 }
 
 proton::transport connection_engine::transport() const {
-    return make_wrapper(c_engine_.transport);
+    return make_wrapper(engine_.transport);
 }
 
 proton::container* connection_engine::container() const {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/bindings/cpp/src/sender_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp
index f5d5525..bed4a69 100644
--- a/proton-c/bindings/cpp/src/sender_options.cpp
+++ b/proton-c/bindings/cpp/src/sender_options.cpp
@@ -108,8 +108,8 @@ void sender_options::update(const sender_options& x) { impl_->update(*x.impl_);
 sender_options& sender_options::handler(class messaging_handler &h) { impl_->handler = &h; return *this; }
 sender_options& sender_options::delivery_mode(proton::delivery_mode m) {impl_->delivery_mode = m; return *this; }
 sender_options& sender_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; }
-sender_options& sender_options::source(source_options &s) {impl_->source = s; return *this; }
-sender_options& sender_options::target(target_options &s) {impl_->target = s; return *this; }
+sender_options& sender_options::source(const source_options &s) {impl_->source = s; return *this; }
+sender_options& sender_options::target(const target_options &s) {impl_->target = s; return *this; }
 
 void sender_options::apply(sender& s) const { impl_->apply(s); }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/docs/api/index.md
----------------------------------------------------------------------
diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md
index 10aea84..ccd679d 100644
--- a/proton-c/docs/api/index.md
+++ b/proton-c/docs/api/index.md
@@ -1,5 +1,39 @@
 Proton Documentation            {#index}
 ====================
 
-The proton library contains two APIs: The [Engine API](@ref engine),
-and the [Messenger API](@ref messenger).
+## The Protocol Engine
+
+The [Engine API](@ref engine) is a "pure AMQP" toolkit, it decodes AMQP bytes
+into proton [events](@ref event) and generates AMQP bytes from application
+calls.
+
+The [connection engine](@ref connection_engine) provides a simple bytes in/bytes
+out, event-driven interface so you can read AMQP data from any source, process
+the resulting [events](@ref event) and write AMQP output to any destination.
+
+There is no IO or threading code in this part of the library, so it can be
+embedded in many different environments. The proton project provides language
+bindings (Python, Ruby, Go etc.) that embed it into the standard IO and
+threading facilities of the bound language.
+
+## Integrating with IO
+
+The [Proactor API](@ref proactor) is a pro-active, asynchronous framewokr to
+build single or multi-threaded Proton C applications.
+
+For advanced use-cases it is possible to write your own implementation of the
+proactor API for an unusual IO or threading framework. Any proton application
+written to the proactor API will be able to use your implementation.
+
+## Messenger and Reactor APIs
+
+The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs were intended
+to be simple APIs that included IO support directly out of the box.
+
+They both had good points but were both based on the assumption of a single-threaded
+environment using a POSIX-like poll() call. This was a problem for performance on some
+platforms and did not support multi-threaded applications.
+
+Note however that application code which interacts with the AMQP @ref engine and
+processes AMQP @ref "events" event is the same for the proactor and reactor APIs,
+so is quite easy to convert. The main difference is in how connections are set up.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/cid.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cid.h b/proton-c/include/proton/cid.h
index 1e4715f..cd68de4 100644
--- a/proton-c/include/proton/cid.h
+++ b/proton-c/include/proton/cid.h
@@ -56,7 +56,10 @@ typedef enum {
   CID_pn_selector,
   CID_pn_selectable,
 
-  CID_pn_url
+  CID_pn_url,
+
+  CID_pn_listener,
+  CID_pn_proactor
 } pn_cid_t;
 
 #endif /* cid.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/condition.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/condition.h b/proton-c/include/proton/condition.h
index cf2f8aa..ae2beff 100644
--- a/proton-c/include/proton/condition.h
+++ b/proton-c/include/proton/condition.h
@@ -165,6 +165,10 @@ PN_EXTERN const char *pn_condition_redirect_host(pn_condition_t *condition);
  */
 PN_EXTERN int pn_condition_redirect_port(pn_condition_t *condition);
 
+PN_EXTERN int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src);
+PN_EXTERN pn_condition_t *pn_condition(void);
+PN_EXTERN void pn_condition_free(pn_condition_t *);
+
 /** @}
  */
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/connection.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h
index c5b5490..0ed23b0 100644
--- a/proton-c/include/proton/connection.h
+++ b/proton-c/include/proton/connection.h
@@ -82,6 +82,7 @@ extern "C" {
  */
 #define PN_REMOTE_MASK (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED)
 
+PN_EXTERN pn_connection_t *pn_connection(void);
 /**
  * Factory to construct a new Connection.
  *
@@ -148,6 +149,13 @@ PN_EXTERN pn_error_t *pn_connection_error(pn_connection_t *connection);
 PN_EXTERN void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector);
 
 /**
+ * Get the collector set with pn_connection_collect()
+ * @return NULL if pn_connection_collect() has not been called.
+*/
+PN_EXTERN pn_collector_t* pn_connection_collector(pn_connection_t *connection);
+
+
+/**
  * @deprecated
  * Get the application context that is associated with a connection
  * object.
@@ -477,6 +485,16 @@ PN_EXTERN pn_data_t *pn_connection_remote_properties(pn_connection_t *connection
  */
 PN_EXTERN pn_transport_t *pn_connection_transport(pn_connection_t *connection);
 
+/**
+ * Create a connection with `size` bytes of extra aligned storage in the same heap block.
+ */
+PN_EXTERN pn_connection_t* pn_connection_with_extra(size_t size);
+
+/**
+ * Get the start and size of extra storage allocated by pn_connection_extra()
+ */
+PN_EXTERN pn_rwbytes_t pn_connection_get_extra(pn_connection_t *connection);
+
 /** @}
  */
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/connection_engine.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h
index d9df77b..b7022a9 100644
--- a/proton-c/include/proton/connection_engine.h
+++ b/proton-c/include/proton/connection_engine.h
@@ -20,160 +20,289 @@
  * under the License.
  */
 
-///@file
-///
-/// **Experimental** The Connection Engine API wraps up the proton engine
-/// objects associated with a single connection: pn_connection_t, pn_transport_t
-/// and pn_collector_t. It provides a simple bytes-in/bytes-out interface for IO
-/// and generates pn_event_t events to be handled by the application.
-///
-/// The connection engine can be fed with raw AMQP bytes from any source, and it
-/// generates AMQP byte output to be written to any destination. You can use the
-/// engine to integrate proton AMQP with any IO library, or native IO on any
-/// platform.
-///
-/// The engine is not thread safe but each engine is independent. Separate
-/// engines can be used concurrently. For example a multi-threaded application
-/// can process connections in multiple threads, but serialize work for each
-/// connection to the corresponding engine.
-///
-/// The engine is designed to be thread and IO neutral so it can be integrated with
-/// single or multi-threaded code in reactive or proactive IO frameworks.
-///
-/// Summary of use:
-///
-/// - while !pn_connection_engine_finished()
-///   - Call pn_connection_engine_dispatch() to dispatch events until it returns NULL.
-///   - Read data from your source into pn_connection_engine_read_buffer()
-///   - Call pn_connection_engine_read_done() when complete.
-///   - Write data from pn_connection_engine_write_buffer() to your destination.
-///   - Call pn_connection_engine_write_done() to indicate how much was written.
-///
-/// Note on blocking: the _read/write_buffer and _read/write_done functions can
-/// all generate events that may cause the engine to finish. Before you wait for
-/// IO, always drain pn_connection_engine_dispatch() till it returns NULL and
-/// check pn_connection_engine_finished() in case there is nothing more to do..
-///
-/// Note on error handling: the pn_connection_engine_*() functions do not return
-/// an error code. If an error occurs it will be reported as a
-/// PN_TRANSPORT_ERROR event and pn_connection_engine_finished() will return
-/// true once all final events have been processed.
-///
-/// @defgroup connection_engine The Connection Engine
-/// @{
-///
-
-#include <proton/condition.h>
-#include <proton/event.h>
+/**
+ * @file
+ *
+ * **Experimental** The connection IO API is a set of functions to simplify
+ * integrating proton with different IO and concurrency platforms. The portable
+ * parts of a Proton application should use the @ref engine types.  We will
+ * use "application" to mean the portable part of the application and
+ * "integration" to mean code that integrates with a particular IO platform.
+ *
+ * The connection_engine functions take a @ref pn_connection_t\*, and perform common
+ * tasks involving the @ref pn_connection_t and it's @ref pn_transport_t and
+ * @ref pn_collector_t so you can treat them as a unit. You can also work with
+ * these types directly for features not available via @ref connection_engine API.
+ *
+ * @defgroup connection_engine Connection Engine
+ *
+ * **Experimental**: Toolkit for integrating proton with arbitrary network or IO
+ * transports. Provides a single point of control for an AMQP connection and
+ * a simple bytes-in/bytes-out interface that lets you:
+ *
+ * - process AMQP-encoded bytes from some input byte stream
+ * - generate @ref pn_event_t events for your application to handle
+ * - encode resulting AMQP output bytes for some output byte stream
+ *
+ * The engine contains a @ref pn_connection_t, @ref pn_transport_t and @ref
+ * pn_collector_t and provides functions to operate on all three as a unit for
+ * IO integration. You can also use them directly for anything not covered by
+ * this API
+ *
+ * For example a simple blocking IO integration with the imaginary "my_io" library:
+ *
+ *     pn_connection_engine_t ce;
+ *     pn_connection_engine_init(&ce);
+ *     while (!pn_connection_engine_finished(&ce) {
+ *         // Dispatch events to be handled by the application.
+ *         pn_event_t *e;
+ *         while ((e = pn_connection_engine_event(&ce))!= NULL) {
+ *             my_app_handle(e); // Pass to the application handler
+ *             switch (pn_event_type(e)) {
+ *                 case PN_CONNECTION_INIT: pn_connection_engine_bind(&ce);
+ *                 // Only for full-duplex IO where read/write can shutdown separately.
+ *                 case PN_TRANSPORT_CLOSE_READ: my_io_shutdown_read(...); break;
+ *                 case PN_TRANSPORT_CLOSE_WRITE: my_io_shutdown_write(...); break;
+ *                 default: break;
+ *             };
+ *             e = pn_connection_engine_pop_event(&ce);
+ *         }
+ *         // Read from my_io into the connection buffer
+ *         pn_rwbytes_t readbuf = pn_connection_engine_read_buffer(&ce);
+ *         if (readbuf.size) {
+ *             size_t n = my_io_read(readbuf.start, readbuf.size, ...);
+ *             if (n > 0) {
+ *                 pn_connection_engine_read_done(&ce, n);
+ *             } else if (n < 0) {
+ *                 pn_connection_engine_errorf(&ce, "read-err", "something-bad (%d): %s", n, ...);
+ *                 pn_connection_engine_read_close(&ce);
+ *             }
+ *         }
+ *         // Write from connection buffer to my_io
+ *         pn_bytes_t writebuf = pn_connection_engine_write_buffer(&ce);
+ *         if (writebuf.size) {
+ *             size_t n = my_io_write_data(writebuf.start, writebuf.size, ...);
+ *             if (n < 0) {
+ *                 pn_connection_engine_errorf(&ce, "write-err", "something-bad (%d): %s", d, ...);
+ *                 pn_connection_engine_write_close(&ce);
+ *             } else {
+ *                 pn_connection_engine_write_done(&ce, n);
+ *             }
+ *         }
+ *     }
+ *     // If my_io doesn't have separate read/write shutdown, then we should close it now.
+ *     my_io_close(...);
+ *
+ * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of
+ * an AMQP connection can close separately, the example shows how to handle this
+ * for full-duplex IO or IO with a simple close.
+ *
+ * The engine buffers events, you must keep processing till
+ * pn_connection_engine_finished() is true, to ensure all reading, writing and event
+ * handling (including ERROR and FINAL events) is completely finished.
+ *
+ * ## Error handling
+ *
+ * The pn_connection_engine_*() functions do not return an error code. IO errors set
+ * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration
+ * code can set errors using pn_connection_engine_errorf()
+ *
+ * ## Other IO patterns
+ *
+ * This API supports asynchronous, proactive, non-blocking and reactive IO. An
+ * integration does not have to follow the dispatch-read-write sequence above,
+ * but note that you should handle all available events before calling
+ * pn_connection_engine_read_buffer() and check that `size` is non-zero before
+ * starting a blocking or asynchronous read call. A `read` started while there
+ * are unprocessed CLOSE events in the buffer may never complete.
+ *
+ * ## Thread safety
+ *
+ * The @ref engine types are not thread safe, but each connection and its
+ * associated types forms an independent unit. Different connections can be
+ * processed concurrently by different threads.
+ *
+ * @defgroup connection_engine Connection IO
+ * @{
+ */
+
 #include <proton/import_export.h>
+#include <proton/event.h>
 #include <proton/types.h>
 
+#include <stdarg.h>
+
 #ifdef __cplusplus
 extern "C" {
 #endif
 
-/// A connection engine is a trio of pn_connection_t, pn_transport_t and pn_collector_t.
-/// Use the pn_connection_engine_*() functions to operate on it.
-/// It is a plain struct, not a proton object. Use pn_connection_engine_init to set up
-/// the initial objects and pn_connection_engine_final to release them.
-///
+/**
+ * Struct containing a connection, transport and collector. See
+ * pn_connection_engine_init(), pn_connection_engine_destroy() and pn_connection_engine()
+ */
 typedef struct pn_connection_engine_t {
-    pn_connection_t* connection;
-    pn_transport_t* transport;
-    pn_collector_t* collector;
-    pn_event_t* event;
+  pn_connection_t *connection;
+  pn_transport_t *transport;
+  pn_collector_t *collector;
 } pn_connection_engine_t;
 
-/// Initialize a pn_connection_engine_t struct with a new connection and
-/// transport.
-///
-/// Call pn_connection_engine_final to free resources when you are done.
-///
-///@return 0 on success, a proton error code on failure (@see error.h)
-///
-PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t* engine);
-
-/// Free resources used by the engine, set the connection and transport pointers
-/// to NULL.
-PN_EXTERN void pn_connection_engine_final(pn_connection_engine_t* engine);
-
-/// Get the engine's read buffer. Read data from your IO source to buf.start, up
-/// to a max of buf.size. Then call pn_connection_engine_read_done().
-///
-/// buf.size==0 means the engine cannot read presently, calling
-/// pn_connection_engine_dispatch() may create more buffer space.
-///
-PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t*);
-
-/// Consume the first n bytes of data in pn_connection_engine_read_buffer() and
-/// update the buffer.
-PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t*, size_t n);
-
-/// Close the read side of the transport when no more data is available.
-/// Note there may still be events for pn_connection_engine_dispatch() or data
-/// in pn_connection_engine_write_buffer()
-PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t*);
-
-/// Get the engine's write buffer. Write data from buf.start to your IO destination,
-/// up to a max of buf.size. Then call pn_connection_engine_write_done().
-///
-/// buf.size==0 means the engine has nothing to write presently.  Calling
-/// pn_connection_engine_dispatch() may generate more data.
-PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t*);
-
-/// Call when the first n bytes of pn_connection_engine_write_buffer() have been
-/// written to IO and can be re-used for new data.  Updates the buffer.
-PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t*, size_t n);
-
-/// Call when the write side of IO has closed and no more data can be written.
-/// Note that there may still be events for pn_connection_engine_dispatch() or
-/// data to read into pn_connection_engine_read_buffer().
-PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t*);
-
-/// Close both sides of the transport, equivalent to
-///     pn_connection_engine_read_close(); pn_connection_engine_write_close()
-///
-/// You must still call pn_connection_engine_dispatch() to process final
-/// events.
-///
-/// To provide transport error information to the handler, set it on
-///     pn_connection_engine_condition()
-/// *before* calling pn_connection_engine_disconnected(). This sets
-/// the error on the pn_transport_t object.
-///
-/// Note this does *not* modify the pn_connection_t, so you can distinguish
-/// between a connection close error sent by the remote peer (which will set the
-/// connection condition) and a transport error (which sets the transport
-/// condition.)
-///
-PN_EXTERN void pn_connection_engine_disconnected(pn_connection_engine_t*);
-
-/// Get the next available event.
-/// Call in a loop until it returns NULL to dispatch all available events.
-/// Note this call may modify the read and write buffers.
-///
-/// @return Pointer to the next event, or NULL if there are none available.
-///
-PN_EXTERN pn_event_t* pn_connection_engine_dispatch(pn_connection_engine_t*);
-
-/// Return true if the engine is finished - all data has been written, all
-/// events have been handled and the transport is closed.
-PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t*);
-
-/// Get the AMQP connection, owned by the pn_connection_engine_t.
-PN_EXTERN pn_connection_t* pn_connection_engine_connection(pn_connection_engine_t*);
-
-/// Get the proton transport, owned by the pn_connection_engine_t.
-PN_EXTERN pn_transport_t* pn_connection_engine_transport(pn_connection_engine_t*);
-
-/// Get the condition object for the engine's transport.
-///
-/// Note that IO errors should be set on this, the transport condition, not on
-/// the pn_connection_t condition. The connection's condition is for errors
-/// received via the AMQP protocol, the transport condition is for errors in the
-/// the IO layer such as a socket read or disconnect errors.
-///
-PN_EXTERN pn_condition_t* pn_connection_engine_condition(pn_connection_engine_t*);
+/**
+ * Set #connection and #transport to the provided values, or create a new
+ * @ref pn_connection_t or @ref pn_transport_t if either is NULL.
+ * The provided values belong to the connection engine and will be freed by
+ * pn_connection_engine_destroy()
+ *
+ * Create a new @ref pn_collector_t and set as #collector.
+ *
+ * The transport and connection are *not* bound at this point. You should
+ * configure them as needed and let the application handle the
+ * PN_CONNECTION_INIT from pn_connection_engine_event() before calling
+ * pn_connection_engine_bind().
+ *
+ * @return if any allocation fails call pn_connection_engine_destroy() and return PN_OUT_OF_MEMORY
+ */
+PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t*, pn_connection_t*, pn_transport_t*);
+
+/**
+ * Bind the connection to the transport when the external IO is ready.
+ *
+ * The following functions (if called at all) must be called *before* bind:
+ * pn_connection_set_username(), pn_connection_set_password(),  pn_transport_set_server()
+ *
+ * If there is an external IO error during setup, set a transport error, close
+ * the transport and then bind. The error events are reported to the application
+ * via pn_connection_engine_event().
+ *
+ * @return an error code if the bind fails.
+ */
+PN_EXTERN int pn_connection_engine_bind(pn_connection_engine_t *);
+
+/**
+ * Unbind, release and free #connection, #transpot and #collector. Set all pointers to NULL.
+ * Does not free the @ref pn_connection_engine_t struct itself.
+ */
+PN_EXTERN void pn_connection_engine_destroy(pn_connection_engine_t *);
+
+/**
+ * Get the read buffer.
+ *
+ * Copy data from your input byte source to buf.start, up to buf.size.
+ * Call pn_connection_engine_read_done() when reading is complete.
+ *
+ * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
+ */
+PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *);
+
+/**
+ * Process the first n bytes of data in pn_connection_engine_read_buffer() and
+ * reclaim the buffer space.
+ */
+PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t *, size_t n);
+
+/**
+ * Close the read side. Call when the IO can no longer be read.
+ */
+PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t *);
+
+/**
+ * True if read side is closed.
+ */
+PN_EXTERN bool pn_connection_engine_read_closed(pn_connection_engine_t *);
+
+/**
+ * Get the write buffer.
+ *
+ * Write data from buf.start to your IO destination, up to a max of buf.size.
+ * Call pn_connection_engine_write_done() when writing is complete.
+ *
+ * buf.size==0 means there is nothing to write.
+ */
+ PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *);
+
+/**
+ * Call when the first n bytes of pn_connection_engine_write_buffer() have been
+ * written to IO. Reclaims the buffer space and reset the write buffer.
+ */
+PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t *, size_t n);
+
+/**
+ * Close the write side. Call when IO can no longer be written to.
+ */
+PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t *);
+
+/**
+ * True if write side is closed.
+ */
+PN_EXTERN bool pn_connection_engine_write_closed(pn_connection_engine_t *);
+
+/**
+ * Close both sides side.
+ */
+PN_EXTERN void pn_connection_engine_close(pn_connection_engine_t * c);
+
+/**
+ * Get the current event. Call pn_connection_engine_done() when done handling it.
+ * Note that if PN_TRACE_EVT is enabled this will log the event, so you should
+ * avoid calling it more than once per event. Use pn_connection_engine_has_event()
+ * to silently test if any events are available.
+ *
+ * @return NULL if there are no more events ready. Reading/writing data may produce more.
+ */
+PN_EXTERN pn_event_t* pn_connection_engine_event(pn_connection_engine_t *);
+
+/**
+ * True if  pn_connection_engine_event() will return a non-NULL event.
+ */
+PN_EXTERN bool pn_connection_engine_has_event(pn_connection_engine_t *);
+
+/**
+ * Drop the current event, advance pn_connection_engine_event() to the next event.
+ */
+PN_EXTERN void pn_connection_engine_pop_event(pn_connection_engine_t *);
+
+/**
+ * Return true if the the engine is closed for reading and writing and there are
+ * no more events.
+ *
+ * Call pn_connection_engine_free() to free all related memory.
+ */
+PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t *);
+
+/**
+ * Set IO error information.
+ *
+ * The name and formatted description are set on the transport condition, and
+ * returned as a PN_TRANSPORT_ERROR event from pn_connection_engine_event().
+ *
+ * You must call this *before* pn_connection_engine_read_close() or
+ * pn_connection_engine_write_close() to ensure the error is processed.
+ *
+ * If there is already a transport condition set, this call does nothing.  For
+ * more complex cases, you can work with the transport condition directly using:
+ *
+ *     pn_condition_t *cond = pn_transport_condition(pn_connection_transport(conn));
+ */
+PN_EXTERN void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...);
+
+/**
+ * Set IO error information via a va_list, see pn_connection_engine_errorf()
+ */
+PN_EXTERN void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list);
+
+/**
+ * Log a string message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_engine_logf(pn_connection_engine_t *ce, char *fmt, ...);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap);
 
 ///@}
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index d10927b..4dca2d5 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -277,13 +277,19 @@ typedef enum {
   PN_TRANSPORT_ERROR,
 
   /**
-   * Indicates that the head of the transport has been closed. This
+   * Indicates that the "head" or writing end of the transport has been closed. This
    * means the transport will never produce more bytes for output to
    * the network. Events of this type point to the relevant transport.
    */
   PN_TRANSPORT_HEAD_CLOSED,
 
   /**
+   * The write side of the transport is closed, it will no longer produce bytes
+   * to write to external IO. Synonynm for PN_TRANSPORT_HEAD_CLOSED
+   */
+  PN_TRANSPORT_WRITE_CLOSED = PN_TRANSPORT_HEAD_CLOSED,
+
+  /**
    * Indicates that the tail of the transport has been closed. This
    * means the transport will never be able to process more bytes from
    * the network. Events of this type point to the relevant transport.
@@ -291,6 +297,12 @@ typedef enum {
   PN_TRANSPORT_TAIL_CLOSED,
 
   /**
+   * The read side of the transport is closed, it will no longer read bytes
+   * from external IO. Synonynm for PN_TRANSPORT_TAIL_CLOSED
+   */
+  PN_TRANSPORT_READ_CLOSED = PN_TRANSPORT_TAIL_CLOSED,
+
+  /**
    * Indicates that the both the head and tail of the transport are
    * closed. Events of this type point to the relevant transport.
    */
@@ -302,7 +314,39 @@ typedef enum {
   PN_SELECTABLE_WRITABLE,
   PN_SELECTABLE_ERROR,
   PN_SELECTABLE_EXPIRED,
-  PN_SELECTABLE_FINAL
+  PN_SELECTABLE_FINAL,
+
+  /**
+   * pn_connection_wake() was called.
+   * Events of this type point to the @ref pn_connection_t.
+   */
+  PN_CONNECTION_WAKE,
+
+  /**
+   * pn_listener_close() was called or an error occurred, see pn_listener_condition()
+   * Events of this type point to the @ref pn_listener_t.
+   */
+  PN_LISTENER_CLOSE,
+
+  /**
+   * pn_proactor_interrupt() was called to interrupt a proactor thread
+   * Events of this type point to the @ref pn_proactor_t.
+   */
+  PN_PROACTOR_INTERRUPT,
+
+  /**
+   * pn_proactor_set_timeout() time limit expired.
+   * Events of this type point to the @ref pn_proactor_t.
+   */
+  PN_PROACTOR_TIMEOUT,
+
+  /**
+   * The proactor becaome inactive: all listeners and connections are closed and
+   * their events processed, the timeout is expired.
+   *
+   * Events of this type point to the @ref pn_proactor_t.
+   */
+  PN_PROACTOR_INACTIVE
 
 } pn_event_type_t;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/extra.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/extra.h b/proton-c/include/proton/extra.h
new file mode 100644
index 0000000..ea2e1ef
--- /dev/null
+++ b/proton-c/include/proton/extra.h
@@ -0,0 +1,69 @@
+#ifndef EXTRA_H
+#define EXTRA_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/type_compat.h>
+#include <proton/types.h>
+#include <stddef.h>
+#include <stdlib.h>
+
+/**
+ * @cond INTERNAL
+ * Support for allocating extra aligned memory after a type.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * extra_t contains a size and is maximally aligned so the memory immediately
+ * after it can store any type of value.
+ */
+typedef union pn_extra_t {
+  size_t size;
+#if __STDC_VERSION__ >= 201112
+  max_align_t max;
+#else
+/* Not standard but fairly safe */
+  uint64_t i;
+  long double d;
+  void *v;
+  void (*fp)(void);
+#endif
+} pn_extra_t;
+
+static inline pn_rwbytes_t pn_extra_rwbytes(pn_extra_t *x) {
+    return pn_rwbytes(x->size, (char*)(x+1));
+}
+
+/* Declare private helper struct for T */
+#define PN_EXTRA_DECLARE(T) typedef struct T##__extra { T base; pn_extra_t extra; } T##__extra
+#define PN_EXTRA_SIZEOF(T, N) (sizeof(T##__extra)+(N))
+#define PN_EXTRA_GET(T, P) pn_extra_rwbytes(&((T##__extra*)(P))->extra)
+
+#ifdef __cplusplus
+}
+#endif
+
+/** @endcond */
+
+#endif // EXTRA_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
new file mode 100644
index 0000000..f55479b
--- /dev/null
+++ b/proton-c/include/proton/listener.h
@@ -0,0 +1,76 @@
+#ifndef PROTON_LISTENER_H
+#define PROTON_LISTENER_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ *
+ * Listener API for the proton @proactor
+ *
+ * @defgroup listener Listener
+ * @ingroup proactor
+ * @{
+ */
+
+typedef struct pn_proactor_t pn_proactor_t;
+typedef struct pn_condition_t pn_condition_t;
+
+/**
+ * Listener accepts connections, see pn_proactor_listen()
+ */
+typedef struct pn_listener_t pn_listener_t;
+
+/**
+ * The proactor that created the listener.
+ */
+pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
+
+/**
+ * Get the error condition for a listener.
+ */
+pn_condition_t *pn_listener_condition(pn_listener_t *l);
+
+/**
+ * Get the user-provided value associated with the listener in pn_proactor_listen()
+ * The start address is aligned so you can cast it to any type.
+ */
+pn_rwbytes_t pn_listener_get_extra(pn_listener_t*);
+
+/**
+ * Close the listener (thread safe).
+ */
+void pn_listener_close(pn_listener_t *l);
+
+/**
+ *@}
+ */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // PROTON_LISTENER_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/object.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/object.h b/proton-c/include/proton/object.h
index bafdcf4..0433b58 100644
--- a/proton-c/include/proton/object.h
+++ b/proton-c/include/proton/object.h
@@ -159,6 +159,29 @@ PREFIX ## _t *PREFIX ## _new(void) {                                      \
     PREFIX ## _inspect                          \
 }
 
+/* Class to identify a plain C struct in a pn_event_t. No refcounting or memory management. */
+#define PN_STRUCT_CLASSDEF(PREFIX, CID)                                 \
+  const pn_class_t *PREFIX ## __class(void);                            \
+  static const pn_class_t *PREFIX ## _reify(void *p) { return PREFIX ## __class(); } \
+  const pn_class_t *PREFIX  ##  __class(void) {                         \
+  static const pn_class_t clazz = {                                     \
+    #PREFIX,                                                            \
+    CID,                                                                \
+    NULL, /*_new*/                                                      \
+    NULL, /*_initialize*/                                               \
+    pn_void_incref,                                                     \
+    pn_void_decref,                                                     \
+    pn_void_refcount,                                                   \
+    NULL, /* _finalize */                                               \
+    NULL, /* _free */                                                   \
+    PREFIX ## _reify,                                                   \
+    pn_void_hashcode,                                                   \
+    pn_void_compare,                                                    \
+    pn_void_inspect                                                     \
+    };                                                                  \
+  return &clazz;                                                        \
+  }
+
 PN_EXTERN pn_cid_t pn_class_id(const pn_class_t *clazz);
 PN_EXTERN const char *pn_class_name(const pn_class_t *clazz);
 PN_EXTERN void *pn_class_new(const pn_class_t *clazz, size_t size);
@@ -181,6 +204,10 @@ PN_EXTERN intptr_t pn_class_compare(const pn_class_t *clazz, void *a, void *b);
 PN_EXTERN bool pn_class_equals(const pn_class_t *clazz, void *a, void *b);
 PN_EXTERN int pn_class_inspect(const pn_class_t *clazz, void *object, pn_string_t *dst);
 
+PN_EXTERN void *pn_void_new(const pn_class_t *clazz, size_t size);
+PN_EXTERN void pn_void_incref(void *object);
+PN_EXTERN void pn_void_decref(void *object);
+PN_EXTERN int pn_void_refcount(void *object);
 PN_EXTERN uintptr_t pn_void_hashcode(void *object);
 PN_EXTERN intptr_t pn_void_compare(void *a, void *b);
 PN_EXTERN int pn_void_inspect(void *object, pn_string_t *dst);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
new file mode 100644
index 0000000..49d7b6a
--- /dev/null
+++ b/proton-c/include/proton/proactor.h
@@ -0,0 +1,174 @@
+#ifndef PROTON_PROACTOR_H
+#define PROTON_PROACTOR_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/types.h>
+#include <proton/import_export.h>
+#include <proton/listener.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct pn_condition_t pn_condition_t;
+
+/**
+ * @file
+ *
+ * **Experimental**: Proactor API for the the proton @ref engine.
+ *
+ * @defgroup proactor Proactor
+ *
+ * **Experimental**: Proactor API for portable, multi-threaded, asynchronous applications.
+ *
+ * The proactor establishes and listens for connections. It creates the @ref
+ * "transport" transport that sends and receives data over the network and
+ * delivers @ref "events" event to application threads for processing.
+ *
+ * ## Multi-threading
+ *
+ * The @ref proactor is thread-safe, but the @ref "protocol engine" is not.  The
+ * proactor ensures that each @ref "connection" connection and its associated
+ * values (@ref session, @ref link etc.) is processed sequentially, even if there
+ * are multiple application threads. See pn_proactor_wait()
+ *
+ * @{
+ */
+
+/**
+ * The proactor creates and manage @ref "transports" transport and delivers @ref
+ * "event" events to the application.
+ *
+ */
+typedef struct pn_proactor_t pn_proactor_t;
+
+/**
+ * Create a proactor. Must be freed with pn_proactor_free()
+ */
+pn_proactor_t *pn_proactor(void);
+
+/**
+ * Free the proactor.
+ */
+void pn_proactor_free(pn_proactor_t*);
+
+/* FIXME aconway 2016-11-12: connect and listen need options to enable
+   things like websockets, alternate encryption or other features.
+   The "extra" parameter will be replaced by an "options" parameter
+   that will include providing extra data and other manipulators
+   to affect how the connection is processed.
+*/
+
+/**
+ * Asynchronous connect: a connection and transport will be created, the
+ * relevant events will be returned by pn_proactor_wait()
+ *
+ * Errors are indicated by PN_TRANSPORT_ERROR/PN_TRANSPORT_CLOSE events.
+ *
+ * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref
+ * pn_rwbytes_null for nothing.
+ *
+ * @return error if the connect cannot be initiated e.g. an allocation failure.
+ * IO errors will be returned as transport events via pn_proactor_wait()
+ */
+int pn_proactor_connect(pn_proactor_t*, const char *host, const char *port, pn_bytes_t extra);
+
+/**
+ * Asynchronous listen: start listening, connections will be returned by pn_proactor_wait()
+ * An error are indicated by PN_LISTENER_ERROR event.
+ *
+ * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref
+ * pn_rwbytes_null for nothing.
+ *
+ * @return error if the connect cannot be initiated e.g. an allocation failure.
+ * IO errors will be returned as transport events via pn_proactor_wait()
+ */
+pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra);
+
+/**
+ * Wait for an event. Can be called in multiple threads concurrently.
+ * You must call pn_event_done() when the event has been handled.
+ *
+ * The proactor ensures that events that cannot be handled concurrently
+ * (e.g. events for for the same connection) are never returned concurrently.
+ */
+pn_event_t *pn_proactor_wait(pn_proactor_t* d);
+
+/**
+ * Cause PN_PROACTOR_INTERRUPT to be returned to exactly one thread calling wait()
+ * for each call to pn_proactor_interrupt(). Thread safe.
+ */
+void pn_proactor_interrupt(pn_proactor_t* d);
+
+/**
+ * Cause PN_PROACTOR_TIMEOUT to be returned to a thread calling wait() after
+ * timeout milliseconds. Thread safe.
+ *
+ * Note calling pn_proactor_set_timeout() again before the PN_PROACTOR_TIMEOUT is
+ * delivered will cancel the previous timeout and deliver an event only after
+ * the new timeout.
+ */
+void pn_proactor_set_timeout(pn_proactor_t* d, pn_millis_t timeout);
+
+/**
+ * Cause a PN_CONNECTION_WAKE event to be returned by the proactor, even if
+ * there are no IO events pending for the connection.
+ *
+ * Thread safe: this is the only pn_connection_ function that can be
+ * called concurrently.
+ *
+ * Wakes can be "coalesced" - if several pn_connection_wake() calls happen
+ * concurrently, there may be only one PN_CONNECTION_WAKE event.
+ */
+void pn_connection_wake(pn_connection_t *c);
+
+/**
+ * The proactor that created the connection.
+ */
+pn_proactor_t *pn_connection_proactor(pn_connection_t *c);
+
+/**
+ * Call when a proactor event has been handled. Does nothing if not a proactor event.
+ *
+ * Thread safe: May be called from any thread but must be called exactly once
+ * for each event returned by pn_proactor_wait()
+ */
+void pn_event_done(pn_event_t *);
+
+/**
+ * Get the proactor that created the event or NULL.
+ */
+pn_proactor_t *pn_event_proactor(pn_event_t *);
+
+/**
+ * Get the listener for the event or NULL.
+ */
+pn_listener_t *pn_event_listener(pn_event_t *);
+
+/**
+ * @}
+ */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // PROTON_PROACTOR_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/types.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h
index 176af47..378719c 100644
--- a/proton-c/include/proton/types.h
+++ b/proton-c/include/proton/types.h
@@ -67,6 +67,7 @@ typedef struct pn_bytes_t {
 } pn_bytes_t;
 
 PN_EXTERN pn_bytes_t pn_bytes(size_t size, const char *start);
+static const pn_bytes_t pn_bytes_null = { 0, NULL };
 
 /** A non-const byte buffer. */
 typedef struct pn_rwbytes_t {
@@ -75,6 +76,7 @@ typedef struct pn_rwbytes_t {
 } pn_rwbytes_t;
 
 PN_EXTERN pn_rwbytes_t pn_rwbytes(size_t size, char *start);
+static const pn_bytes_t pn_rwbytes_null = { 0, NULL };
 
 /** @}
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
new file mode 100644
index 0000000..f31ddb0
--- /dev/null
+++ b/proton-c/src/core/connection_driver.c
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "engine-internal.h"
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/connection_engine.h>
+#include <proton/transport.h>
+#include <string.h>
+
+int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
+  ce->connection = c ? c : pn_connection();
+  ce->transport = t ? t : pn_transport();
+  ce->collector = pn_collector();
+  if (!ce->connection || !ce->transport || !ce->collector) {
+    pn_connection_engine_destroy(ce);
+    return PN_OUT_OF_MEMORY;
+  }
+  pn_connection_collect(ce->connection, ce->collector);
+  return 0;
+}
+
+int pn_connection_engine_bind(pn_connection_engine_t *ce) {
+  return pn_transport_bind(ce->transport, ce->connection);
+}
+
+void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
+  if (ce->transport) {
+    pn_transport_unbind(ce->transport);
+    pn_transport_free(ce->transport);
+  }
+  if (ce->collector) pn_collector_free(ce->collector);
+  if (ce->connection) pn_connection_free(ce->connection);
+  memset(ce, 0, sizeof(*ce));
+}
+
+pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
+  ssize_t cap = pn_transport_capacity(ce->transport);
+  return (cap > 0) ?  pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
+}
+
+void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
+  if (n > 0) pn_transport_process(ce->transport, n);
+}
+
+bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
+  return pn_transport_capacity(ce->transport) < 0;
+}
+
+void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
+  if (!pn_connection_engine_read_closed(ce)) {
+    pn_transport_close_tail(ce->transport);
+  }
+}
+
+pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
+  ssize_t pending = pn_transport_pending(ce->transport);
+  return (pending > 0) ?
+    pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
+}
+
+void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
+  if (n > 0)
+    pn_transport_pop(ce->transport, n);
+}
+
+bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
+  return pn_transport_pending(ce->transport) < 0;
+}
+
+void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
+  if (!pn_connection_engine_write_closed(ce)) {
+    pn_transport_close_head(ce->transport);
+  }
+}
+
+void pn_connection_engine_close(pn_connection_engine_t *ce) {
+  pn_connection_engine_read_close(ce);
+  pn_connection_engine_write_close(ce);
+}
+
+pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
+  pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
+  if (e) {
+    pn_transport_t *t = ce->transport;
+    if (t && t->trace & PN_TRACE_EVT) {
+      /* This can log the same event twice if pn_connection_engine_event is called
+       * twice but for debugging it is much better to log before handling than after.
+       */
+      pn_string_clear(t->scratch);
+      pn_inspect(e, t->scratch);
+      pn_transport_log(t, pn_string_get(t->scratch));
+    }
+  }
+  return e;
+}
+
+bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
+  return ce->collector && pn_collector_peek(ce->collector);
+}
+
+void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
+  if (ce->collector) {
+    pn_event_t *e = pn_collector_peek(ce->collector);
+    if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
+      /* Events can accumulate behind the TRANSPORT_CLOSED before the
+       * PN_TRANSPORT_CLOSED event is handled. They can never be processed
+       * so release them.
+       */
+      pn_collector_release(ce->collector);
+    } else {
+      pn_collector_pop(ce->collector);
+    }
+
+  }
+}
+
+bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
+  return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
+}
+
+void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
+  pn_transport_t *t = ce->transport;
+  pn_condition_t *cond = pn_transport_condition(t);
+  pn_string_vformat(t->scratch, fmt, ap);
+  pn_condition_set_name(cond, name);
+  pn_condition_set_description(cond, pn_string_get(t->scratch));
+}
+
+void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
+  va_list ap;
+  va_start(ap, fmt);
+  pn_connection_engine_verrorf(ce, name, fmt, ap);
+  va_end(ap);
+}
+
+void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
+  pn_transport_log(ce->transport, msg);
+}
+
+void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
+  pn_transport_vlogf(ce->transport, fmt, ap);
+}
+
+void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
+  pn_transport_log(ce->transport, msg);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/src/core/connection_engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_engine.c b/proton-c/src/core/connection_engine.c
index 5d184a1..f31ddb0 100644
--- a/proton-c/src/core/connection_engine.c
+++ b/proton-c/src/core/connection_engine.c
@@ -16,109 +16,148 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "engine-internal.h"
 
+#include "engine-internal.h"
+#include <proton/condition.h>
 #include <proton/connection.h>
 #include <proton/connection_engine.h>
 #include <proton/transport.h>
 #include <string.h>
 
-int pn_connection_engine_init(pn_connection_engine_t* e) {
-    memset(e, 0, sizeof(*e));
-    e->connection = pn_connection();
-    e->transport = pn_transport();
-    e->collector = pn_collector();
-    if (!e->connection || !e->transport || !e->collector) {
-        pn_connection_engine_final(e);
-        return PN_OUT_OF_MEMORY;
-    }
-    pn_connection_collect(e->connection, e->collector);
-    return PN_OK;
+int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
+  ce->connection = c ? c : pn_connection();
+  ce->transport = t ? t : pn_transport();
+  ce->collector = pn_collector();
+  if (!ce->connection || !ce->transport || !ce->collector) {
+    pn_connection_engine_destroy(ce);
+    return PN_OUT_OF_MEMORY;
+  }
+  pn_connection_collect(ce->connection, ce->collector);
+  return 0;
 }
 
-void pn_connection_engine_final(pn_connection_engine_t* e) {
-    if (e->transport && e->connection) {
-        pn_transport_unbind(e->transport);
-        pn_decref(e->transport);
-    }
-    if (e->collector)
-        pn_collector_free(e->collector); /* Break cycle with connection */
-    if (e->connection)
-        pn_decref(e->connection);
-    memset(e, 0, sizeof(*e));
+int pn_connection_engine_bind(pn_connection_engine_t *ce) {
+  return pn_transport_bind(ce->transport, ce->connection);
 }
 
-pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t* e) {
-    ssize_t cap = pn_transport_capacity(e->transport);
-    if (cap > 0)
-        return pn_rwbytes(cap, pn_transport_tail(e->transport));
-    else
-        return pn_rwbytes(0, 0);
+void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
+  if (ce->transport) {
+    pn_transport_unbind(ce->transport);
+    pn_transport_free(ce->transport);
+  }
+  if (ce->collector) pn_collector_free(ce->collector);
+  if (ce->connection) pn_connection_free(ce->connection);
+  memset(ce, 0, sizeof(*ce));
 }
 
-void pn_connection_engine_read_done(pn_connection_engine_t* e, size_t n) {
-    if (n > 0)
-        pn_transport_process(e->transport, n);
+pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
+  ssize_t cap = pn_transport_capacity(ce->transport);
+  return (cap > 0) ?  pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
 }
 
-void pn_connection_engine_read_close(pn_connection_engine_t* e) {
-    pn_transport_close_tail(e->transport);
+void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
+  if (n > 0) pn_transport_process(ce->transport, n);
 }
 
-pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t* e) {
-    ssize_t pending = pn_transport_pending(e->transport);
-    if (pending > 0)
-        return pn_bytes(pending, pn_transport_head(e->transport));
-    else
-        return pn_bytes(0, 0);
+bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
+  return pn_transport_capacity(ce->transport) < 0;
 }
 
-void pn_connection_engine_write_done(pn_connection_engine_t* e, size_t n) {
-    if (n > 0)
-        pn_transport_pop(e->transport, n);
+void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
+  if (!pn_connection_engine_read_closed(ce)) {
+    pn_transport_close_tail(ce->transport);
+  }
 }
 
-void pn_connection_engine_write_close(pn_connection_engine_t* e){
-    pn_transport_close_head(e->transport);
+pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
+  ssize_t pending = pn_transport_pending(ce->transport);
+  return (pending > 0) ?
+    pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
 }
 
-void pn_connection_engine_disconnected(pn_connection_engine_t* e) {
-    pn_connection_engine_read_close(e);
-    pn_connection_engine_write_close(e);
+void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
+  if (n > 0)
+    pn_transport_pop(ce->transport, n);
 }
 
-static void log_event(pn_connection_engine_t *engine, pn_event_t* event) {
-    if (event && engine->transport->trace & PN_TRACE_EVT) {
-        pn_string_t *str = pn_string(NULL);
-        pn_inspect(event, str);
-        pn_transport_log(engine->transport, pn_string_get(str));
-        pn_free(str);
+bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
+  return pn_transport_pending(ce->transport) < 0;
+}
+
+void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
+  if (!pn_connection_engine_write_closed(ce)) {
+    pn_transport_close_head(ce->transport);
+  }
+}
+
+void pn_connection_engine_close(pn_connection_engine_t *ce) {
+  pn_connection_engine_read_close(ce);
+  pn_connection_engine_write_close(ce);
+}
+
+pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
+  pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
+  if (e) {
+    pn_transport_t *t = ce->transport;
+    if (t && t->trace & PN_TRACE_EVT) {
+      /* This can log the same event twice if pn_connection_engine_event is called
+       * twice but for debugging it is much better to log before handling than after.
+       */
+      pn_string_clear(t->scratch);
+      pn_inspect(e, t->scratch);
+      pn_transport_log(t, pn_string_get(t->scratch));
     }
+  }
+  return e;
+}
+
+bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
+  return ce->collector && pn_collector_peek(ce->collector);
 }
 
-pn_event_t* pn_connection_engine_dispatch(pn_connection_engine_t* e) {
-    if (e->event) {             /* Already returned */
-        if (pn_event_type(e->event) == PN_CONNECTION_INIT)
-            pn_transport_bind(e->transport, e->connection);
-        pn_collector_pop(e->collector);
+void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
+  if (ce->collector) {
+    pn_event_t *e = pn_collector_peek(ce->collector);
+    if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
+      /* Events can accumulate behind the TRANSPORT_CLOSED before the
+       * PN_TRANSPORT_CLOSED event is handled. They can never be processed
+       * so release them.
+       */
+      pn_collector_release(ce->collector);
+    } else {
+      pn_collector_pop(ce->collector);
     }
-    e->event = pn_collector_peek(e->collector);
-    log_event(e, e->event);
-    return e->event;
+
+  }
+}
+
+bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
+  return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
+}
+
+void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
+  pn_transport_t *t = ce->transport;
+  pn_condition_t *cond = pn_transport_condition(t);
+  pn_string_vformat(t->scratch, fmt, ap);
+  pn_condition_set_name(cond, name);
+  pn_condition_set_description(cond, pn_string_get(t->scratch));
 }
 
-bool pn_connection_engine_finished(pn_connection_engine_t* e) {
-    return pn_transport_closed(e->transport) && (pn_collector_peek(e->collector) == NULL);
+void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
+  va_list ap;
+  va_start(ap, fmt);
+  pn_connection_engine_verrorf(ce, name, fmt, ap);
+  va_end(ap);
 }
 
-pn_connection_t* pn_connection_engine_connection(pn_connection_engine_t* e) {
-    return e->connection;
+void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
+  pn_transport_log(ce->transport, msg);
 }
 
-pn_transport_t* pn_connection_engine_transport(pn_connection_engine_t* e) {
-    return e->transport;
+void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
+  pn_transport_vlogf(ce->transport, fmt, ap);
 }
 
-pn_condition_t* pn_connection_engine_condition(pn_connection_engine_t* e) {
-    return pn_transport_condition(e->transport);
+void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
+  pn_transport_log(ce->transport, msg);
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/src/core/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/engine.c b/proton-c/src/core/engine.c
index e238d5c..2836a43 100644
--- a/proton-c/src/core/engine.c
+++ b/proton-c/src/core/engine.c
@@ -32,6 +32,8 @@
 #include "platform/platform_fmt.h"
 #include "transport.h"
 
+#include <proton/extra.h>
+
 
 static void pni_session_bound(pn_session_t *ssn);
 static void pni_link_bound(pn_link_t *link);
@@ -208,6 +210,12 @@ void pn_condition_init(pn_condition_t *condition)
   condition->info = pn_data(0);
 }
 
+pn_condition_t *pn_condition() {
+  pn_condition_t *c = (pn_condition_t*)malloc(sizeof(pn_condition_t));
+  pn_condition_init(c);
+  return c;
+}
+
 void pn_condition_tini(pn_condition_t *condition)
 {
   pn_data_free(condition->info);
@@ -215,6 +223,14 @@ void pn_condition_tini(pn_condition_t *condition)
   pn_free(condition->name);
 }
 
+void pn_condition_free(pn_condition_t *c) {
+  if (c) {
+    pn_condition_clear(c);
+    pn_condition_tini(c);
+    free(c);
+  }
+}
+
 static void pni_add_session(pn_connection_t *conn, pn_session_t *ssn)
 {
   pn_list_add(conn->sessions, ssn);
@@ -495,10 +511,15 @@ static void pn_connection_finalize(void *object)
 #define pn_connection_compare NULL
 #define pn_connection_inspect NULL
 
-pn_connection_t *pn_connection(void)
+PN_EXTRA_DECLARE(pn_connection_t);
+
+pn_rwbytes_t pn_connection_get_extra(pn_connection_t *c) { return PN_EXTRA_GET(pn_connection_t, c); }
+
+pn_connection_t *pn_connection_with_extra(size_t extra)
 {
   static const pn_class_t clazz = PN_CLASS(pn_connection);
-  pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t));
+  size_t size = PN_EXTRA_SIZEOF(pn_connection_t, extra);
+  pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, size);
   if (!conn) return NULL;
 
   conn->endpoint_head = NULL;
@@ -527,6 +548,10 @@ pn_connection_t *pn_connection(void)
   return conn;
 }
 
+pn_connection_t *pn_connection(void) {
+  return pn_connection_with_extra(0);
+}
+
 static const pn_event_type_t endpoint_init_event_map[] = {
   PN_CONNECTION_INIT,  /* CONNECTION */
   PN_SESSION_INIT,     /* SESSION */
@@ -545,6 +570,10 @@ void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collecto
   }
 }
 
+pn_collector_t* pn_connection_collector(pn_connection_t *connection) {
+  return connection->collector;
+}
+
 pn_state_t pn_connection_state(pn_connection_t *connection)
 {
   return connection ? connection->endpoint.state : 0;
@@ -2229,3 +2258,15 @@ pn_transport_t *pn_event_transport(pn_event_t *event)
     }
   }
 }
+
+int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src) {
+  assert(dest);
+  assert(src);
+  int err = 0;
+  if (src != dest) {
+    int err = pn_string_copy(dest->name, src->name);
+    if (!err) err = pn_string_copy(dest->description, src->description);
+    if (!err) err = pn_data_copy(dest->info, src->info);
+  }
+  return err;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/src/core/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c
index c13f287..7882327 100644
--- a/proton-c/src/core/event.c
+++ b/proton-c/src/core/event.c
@@ -371,7 +371,18 @@ const char *pn_event_type_name(pn_event_type_t type)
     return "PN_SELECTABLE_EXPIRED";
   case PN_SELECTABLE_FINAL:
     return "PN_SELECTABLE_FINAL";
+   case PN_CONNECTION_WAKE:
+    return "PN_CONNECTION_WAKE";
+   case PN_LISTENER_CLOSE:
+    return "PN_LISTENER_CLOSE";
+   case PN_PROACTOR_INTERRUPT:
+    return "PN_PROACTOR_INTERRUPT";
+   case PN_PROACTOR_TIMEOUT:
+    return "PN_PROACTOR_TIMEOUT";
+   case PN_PROACTOR_INACTIVE:
+    return "PN_PROACTOR_INACTIVE";
+   default:
+    return "PN_UNKNOWN";
   }
-
   return NULL;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/src/core/object/object.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/object/object.c b/proton-c/src/core/object/object.c
index b0c1b33..a6952b6 100644
--- a/proton-c/src/core/object/object.c
+++ b/proton-c/src/core/object/object.c
@@ -32,10 +32,10 @@ intptr_t pn_object_compare(void *a, void *b) { return (intptr_t) a - (intptr_t)
 const pn_class_t PN_OBJECT[] = {PN_CLASS(pn_object)};
 
 #define pn_void_initialize NULL
-static void *pn_void_new(const pn_class_t *clazz, size_t size) { return malloc(size); }
-static void pn_void_incref(void *object) {}
-static void pn_void_decref(void *object) {}
-static int pn_void_refcount(void *object) { return -1; }
+void *pn_void_new(const pn_class_t *clazz, size_t size) { return malloc(size); }
+void pn_void_incref(void* p) {}
+void pn_void_decref(void* p) {}
+int pn_void_refcount(void *object) { return -1; }
 #define pn_void_finalize NULL
 static void pn_void_free(void *object) { free(object); }
 static const pn_class_t *pn_void_reify(void *object) { return PN_VOID; }
@@ -199,7 +199,7 @@ typedef struct {
 void *pn_object_new(const pn_class_t *clazz, size_t size)
 {
   void *object = NULL;
-  pni_head_t *head = (pni_head_t *) malloc(sizeof(pni_head_t) + size);
+  pni_head_t *head = (pni_head_t *) calloc(1, sizeof(pni_head_t) + size);
   if (head != NULL) {
     object = head + 1;
     head->clazz = clazz;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org