You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/03/23 19:58:45 UTC

[2/3] qpid-proton git commit: PROTON-1161 - c++: better interface to connection_engine.

PROTON-1161 - c++: better interface to connection_engine.

More flexible connection_engine interface to support reactor and proactor patterns.
Provides direct access to proton buffers for minimal copies in either case.

connection_engine is now completely IO and thread neutral, it simply handles:

    bytes-in -> events as handler function calls -> bytes-out

Moved connection_engine and related classes into the proton::io namespace.

Cleaned up engine implementation, improved the unit tests.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/222574ed
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/222574ed
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/222574ed

Branch: refs/heads/master
Commit: 222574ed95acaa4143ea3e2c2cd9e7abac9e1dd9
Parents: c63b2be
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 17 18:29:13 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 23 13:48:02 2016 -0400

----------------------------------------------------------------------
 examples/cpp/engine/broker.cpp                  | 141 +++++-----
 examples/cpp/engine/client.cpp                  |   4 +-
 examples/cpp/engine/direct_recv.cpp             |   6 +-
 examples/cpp/engine/direct_send.cpp             |   6 +-
 examples/cpp/engine/helloworld.cpp              |   4 +-
 examples/cpp/engine/server.cpp                  |   4 +-
 examples/cpp/engine/simple_recv.cpp             |   4 +-
 examples/cpp/engine/simple_send.cpp             |   4 +-
 examples/cpp/example_test.py                    |   2 +-
 proton-c/bindings/cpp/CMakeLists.txt            |  12 +-
 proton-c/bindings/cpp/docs/mainpage.md          |   6 +-
 proton-c/bindings/cpp/docs/tutorial.dox         |   4 +-
 .../bindings/cpp/include/proton/connection.hpp  |   6 +-
 .../cpp/include/proton/connection_engine.hpp    | 168 ------------
 .../cpp/include/proton/connection_options.hpp   |   6 +-
 .../bindings/cpp/include/proton/handler.hpp     |   6 +-
 proton-c/bindings/cpp/include/proton/io.hpp     | 134 ----------
 .../cpp/include/proton/io/connection_engine.hpp | 182 +++++++++++++
 .../bindings/cpp/include/proton/io/socket.hpp   | 130 ++++++++++
 .../bindings/cpp/include/proton/transport.hpp   |   5 +
 proton-c/bindings/cpp/src/connection_engine.cpp | 208 ---------------
 proton-c/bindings/cpp/src/contexts.hpp          |  15 +-
 proton-c/bindings/cpp/src/engine_test.cpp       | 258 +++++++++++--------
 .../bindings/cpp/src/io/connection_engine.cpp   | 172 +++++++++++++
 proton-c/bindings/cpp/src/io/posix/socket.cpp   | 194 ++++++++++++++
 proton-c/bindings/cpp/src/io/windows/socket.cpp | 217 ++++++++++++++++
 proton-c/bindings/cpp/src/posix/io.cpp          | 175 -------------
 proton-c/bindings/cpp/src/scalar_test.cpp       |   1 -
 proton-c/bindings/cpp/src/test_bits.hpp         |  29 ++-
 proton-c/bindings/cpp/src/value_test.cpp        |   4 +-
 proton-c/bindings/cpp/src/windows/io.cpp        | 197 --------------
 31 files changed, 1187 insertions(+), 1117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/broker.cpp b/examples/cpp/engine/broker.cpp
index de08991..698d795 100644
--- a/examples/cpp/engine/broker.cpp
+++ b/examples/cpp/engine/broker.cpp
@@ -24,76 +24,25 @@
 
 #include <iostream>
 
-#ifdef WIN32
-#include "proton/acceptor.hpp"
-#include "proton/container.hpp"
-#include "proton/value.hpp"
-
-
-#include "fake_cpp11.hpp"
-
-class broker {
-  public:
-    broker(const proton::url& url) : handler_(url, queues_) {}
-
-    proton::handler& handler() { return handler_; }
-
-  private:
-
-    class my_handler : public broker_handler {
-      public:
-        my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
-
-        void on_start(proton::event &e) override {
-            e.container().listen(url_);
-            std::cout << "broker listening on " << url_ << std::endl;
-        }
-
-      private:
-        const proton::url& url_;
-    };
-
-  private:
-    queues queues_;
-    my_handler handler_;
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    proton::url url("0.0.0.0");
-    options opts(argc, argv);
-    opts.add_value(url, 'a', "address", "listen on URL", "URL");
-    try {
-        opts.parse();
-        broker b(url);
-        proton::container(b.handler()).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}
-#else // WIN32
-#include <proton/io.hpp>
+#ifndef WIN32                   // TODO aconway 2016-03-23: windows broker example
+#include <proton/io/socket.hpp>
 #include <sys/select.h>
 #include <set>
 
 template <class T> T check(T result, const std::string& msg="io_error: ") {
     if (result < 0)
-        throw proton::connection_engine::io_error(msg + proton::io::error_str());
+        throw proton::io::socket::io_error(msg + proton::io::socket::error_str());
     return result;
 }
 
 void fd_set_if(bool on, int fd, fd_set *fds);
 
 class broker {
-    typedef std::set<proton::io::socket_engine*> engines;
+    typedef std::set<proton::io::socket::engine*> engines;
 
     queues queues_;
     broker_handler handler_;
-    proton::connection_engine::container container_;
+    proton::io::connection_engine::container container_;
     engines engines_;
     fd_set reading_, writing_;
 
@@ -109,7 +58,7 @@ class broker {
     }
 
     void run(const proton::url& url) {
-        proton::io::listener listener(url.host(), url.port());
+        proton::io::socket::listener listener(url.host(), url.port());
         std::cout << "listening on " << url << " fd=" << listener.socket() << std::endl;
         FD_SET(listener.socket(), &reading_);
         while(true) {
@@ -122,27 +71,26 @@ class broker {
                 int fd = listener.accept(client_host, client_port);
                 std::cout << "accepted " << client_host << ":" << client_port
                           << " fd=" << fd << std::endl;
-                engines_.insert(new proton::io::socket_engine(fd, handler_, container_.make_options()));
+                engines_.insert(
+                    new proton::io::socket::engine(
+                        fd, handler_, container_.make_options()));
                 FD_SET(fd, &reading_);
                 FD_SET(fd, &writing_);
             }
 
             for (engines::iterator i = engines_.begin(); i != engines_.end(); ) {
-                engines::iterator j = i++;        // Save iterator in case we need to erase it.
-                proton::io::socket_engine *eng = *j;
+                proton::io::socket::engine *eng = *(i++);
                 int flags = 0;
-                if (FD_ISSET(eng->socket(), &readable_set))
-                    flags |= proton::io::socket_engine::READ;
                 if (FD_ISSET(eng->socket(), &writable_set))
-                    flags |= proton::io::socket_engine::WRITE;
-                if (flags) eng->process(flags);
-                // Set reading/writing bits for next time around
-                fd_set_if(eng->can_read(), eng->socket(), &reading_);
-                fd_set_if(eng->can_write(), eng->socket(), &writing_);
-
-                if (eng->closed()) {
+                    eng->write();
+                if (FD_ISSET(eng->socket(), &readable_set))
+                    eng->read();
+                if (eng->dispatch()) {
+                    fd_set_if(eng->read_buffer().size, eng->socket(), &reading_);
+                    fd_set_if(eng->write_buffer().size, eng->socket(), &writing_);
+                } else {
                     std::cout << "closed fd=" << eng->socket() << std::endl;
-                    engines_.erase(j);
+                    engines_.erase(eng);
                     delete eng;
                 }
             }
@@ -173,4 +121,57 @@ int main(int argc, char **argv) {
     }
     return 1;
 }
+#else // WIN32
+
+#include "proton/acceptor.hpp"
+#include "proton/container.hpp"
+#include "proton/value.hpp"
+
+
+#include "fake_cpp11.hpp"
+
+class broker {
+  public:
+    broker(const proton::url& url) : handler_(url, queues_) {}
+
+    proton::handler& handler() { return handler_; }
+
+  private:
+
+    class my_handler : public broker_handler {
+      public:
+        my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
+
+        void on_start(proton::event &e) override {
+            e.container().listen(url_);
+            std::cout << "broker listening on " << url_ << std::endl;
+        }
+
+      private:
+        const proton::url& url_;
+    };
+
+  private:
+    queues queues_;
+    my_handler handler_;
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    proton::url url("0.0.0.0");
+    options opts(argc, argv);
+    opts.add_value(url, 'a', "address", "listen on URL", "URL");
+    try {
+        opts.parse();
+        broker b(url);
+        proton::container(b.handler()).run();
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}
+
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp
index d2d37c0..941ca75 100644
--- a/examples/cpp/engine/client.cpp
+++ b/examples/cpp/engine/client.cpp
@@ -20,7 +20,7 @@
  */
 
 #include "options.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/url.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
@@ -87,7 +87,7 @@ int main(int argc, char **argv) {
         requests.push_back("All mimsy were the borogroves,");
         requests.push_back("And the mome raths outgrabe.");
         client handler(address, requests);
-        proton::io::socket_engine(address, handler).run();
+        proton::io::socket::engine(address, handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_recv.cpp b/examples/cpp/engine/direct_recv.cpp
index 3fcc28e..51f6572 100644
--- a/examples/cpp/engine/direct_recv.cpp
+++ b/examples/cpp/engine/direct_recv.cpp
@@ -21,7 +21,7 @@
 
 #include "options.hpp"
 
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
 #include "proton/link.hpp"
@@ -70,10 +70,10 @@ int main(int argc, char **argv) {
     try {
         opts.parse();
         proton::url url(address);
-        proton::io::listener listener(url.host(), url.port());
+        proton::io::socket::listener listener(url.host(), url.port());
         std::cout << "direct_recv listening on " << url << std::endl;
         direct_recv handler(message_count);
-        proton::io::socket_engine(listener.accept(), handler).run();
+        proton::io::socket::engine(listener.accept(), handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_send.cpp b/examples/cpp/engine/direct_send.cpp
index 28ea845..4d7be72 100644
--- a/examples/cpp/engine/direct_send.cpp
+++ b/examples/cpp/engine/direct_send.cpp
@@ -23,7 +23,7 @@
 
 #include "proton/acceptor.hpp"
 #include "proton/connection.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/url.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
@@ -82,10 +82,10 @@ int main(int argc, char **argv) {
     try {
         opts.parse();
         proton::url url(address);
-        proton::io::listener listener(url.host(), url.port());
+        proton::io::socket::listener listener(url.host(), url.port());
         std::cout << "direct_send listening on " << url << std::endl;
         simple_send handler(message_count);
-        proton::io::socket_engine(listener.accept(), handler).run();
+        proton::io::socket::engine(listener.accept(), handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/helloworld.cpp b/examples/cpp/engine/helloworld.cpp
index d5a9f44..4bb0ed3 100644
--- a/examples/cpp/engine/helloworld.cpp
+++ b/examples/cpp/engine/helloworld.cpp
@@ -22,7 +22,7 @@
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
 #include "proton/url.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 
 #include <iostream>
 
@@ -57,7 +57,7 @@ int main(int argc, char **argv) {
     try {
         proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples");
         hello_world hw(url.path());
-        proton::io::socket_engine(url, hw).run();
+        proton::io::socket::engine(url, hw).run();
 
         return 0;
     } catch (const std::exception& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/server.cpp b/examples/cpp/engine/server.cpp
index 794d27e..bdd1a73 100644
--- a/examples/cpp/engine/server.cpp
+++ b/examples/cpp/engine/server.cpp
@@ -22,7 +22,7 @@
 #include "options.hpp"
 
 #include "proton/connection.hpp"
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/url.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
@@ -79,7 +79,7 @@ int main(int argc, char **argv) {
     try {
         opts.parse();
         server handler(address);
-        proton::io::socket_engine(address, handler).run();
+        proton::io::socket::engine(address, handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_recv.cpp b/examples/cpp/engine/simple_recv.cpp
index 5b6cf21..a081227 100644
--- a/examples/cpp/engine/simple_recv.cpp
+++ b/examples/cpp/engine/simple_recv.cpp
@@ -21,7 +21,7 @@
 
 #include "options.hpp"
 
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/url.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
@@ -76,7 +76,7 @@ int main(int argc, char **argv) {
     try {
         opts.parse();
         simple_recv handler(address, message_count);
-        proton::io::socket_engine(address, handler).run();
+        proton::io::socket::engine(address, handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/engine/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_send.cpp b/examples/cpp/engine/simple_send.cpp
index 39d8939..f6c0318 100644
--- a/examples/cpp/engine/simple_send.cpp
+++ b/examples/cpp/engine/simple_send.cpp
@@ -21,7 +21,7 @@
 
 #include "options.hpp"
 
-#include "proton/io.hpp"
+#include "proton/io/socket.hpp"
 #include "proton/url.hpp"
 #include "proton/event.hpp"
 #include "proton/handler.hpp"
@@ -85,7 +85,7 @@ int main(int argc, char **argv) {
     try {
         opts.parse();
         simple_send handler(address, message_count);
-        proton::io::socket_engine(address, handler).run();
+        proton::io::socket::engine(address, handler).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 6cfc632..d228d67 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -90,7 +90,7 @@ class Proc(Popen):
                         self.ready_set = True
                         self.ready.set()
             if self.wait() != 0:
-                self.error = ProcError(self)
+                raise ProcError(self)
         except Exception, e:
             self.error = sys.exc_info()
         finally:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 26a7c94..9254a3a 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -28,7 +28,6 @@ set(qpid-proton-cpp-source
   src/acceptor.cpp
   src/binary.cpp
   src/byte_array.cpp
-  src/scalar_base.cpp
   src/condition.cpp
   src/connection.cpp
   src/connection_options.cpp
@@ -37,21 +36,21 @@ set(qpid-proton-cpp-source
   src/container_impl.cpp
   src/contexts.cpp
   src/data.cpp
-  src/decoder.cpp
   src/decimal.cpp
+  src/decoder.cpp
   src/delivery.cpp
   src/duration.cpp
   src/encoder.cpp
   src/endpoint.cpp
-  src/connection_engine.cpp
   src/error.cpp
+  src/handler.cpp
   src/id_generator.cpp
+  src/io/connection_engine.cpp
   src/link.cpp
   src/link_options.cpp
   src/message.cpp
   src/messaging_adapter.cpp
   src/messaging_event.cpp
-  src/handler.cpp
   src/object.cpp
   src/proton_bits.cpp
   src/proton_event.cpp
@@ -60,6 +59,7 @@ set(qpid-proton-cpp-source
   src/receiver.cpp
   src/reconnect_timer.cpp
   src/sasl.cpp
+  src/scalar_base.cpp
   src/sender.cpp
   src/session.cpp
   src/ssl.cpp
@@ -75,9 +75,9 @@ set(qpid-proton-cpp-source
   )
 
 if(MSVC)
-  list(APPEND qpid-proton-cpp-source src/windows/io.cpp)
+  list(APPEND qpid-proton-cpp-source src/io/windows/socket.cpp)
 else(MSVC)
-  list(APPEND qpid-proton-cpp-source src/posix/io.cpp)
+  list(APPEND qpid-proton-cpp-source src/io/posix/socket.cpp)
 endif(MSVC)
 
 set_source_files_properties (

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/docs/mainpage.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mainpage.md b/proton-c/bindings/cpp/docs/mainpage.md
index f20d957..8ad34cb 100644
--- a/proton-c/bindings/cpp/docs/mainpage.md
+++ b/proton-c/bindings/cpp/docs/mainpage.md
@@ -103,10 +103,10 @@ from your handler methods.
 ### %proton::connection_engine
 
 `proton::connection_engine` dispatches events for a *single
-connection*. The subclass `proton::io::socket_engine` does
+connection*. The subclass `proton::io::socket::engine` does
 socket-based IO. An application with a single connection is just like
 using `proton::container` except you attach your handler to a
-`proton::io::socket_engine` instead. You can compare examples, such as
+`proton::io::socket::engine` instead. You can compare examples, such as
 \ref helloworld.cpp and \ref engine/helloworld.cpp.
 
 Now consider multiple connections. `proton::container` is easy to use
@@ -124,7 +124,7 @@ platforms. The example \ref engine/broker.cpp shows a broker using
 sockets and poll, but you can see how the code could be adapted.
 
 `proton::connection_engine` also does not dictate the IO mechanism,
-but it is an abstract class. `proton::socket_engine` provides
+but it is an abstract class. `proton::socket::engine` provides
 ready-made socket-based IO, but you can write your own subclass with
 any IO code. Just override the `io_read`, `io_write` and `io_close`
 methods. For example, the proton test suite implements an in-memory

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/docs/tutorial.dox
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/tutorial.dox b/proton-c/bindings/cpp/docs/tutorial.dox
index dcfbe05..e40a3e7 100644
--- a/proton-c/bindings/cpp/docs/tutorial.dox
+++ b/proton-c/bindings/cpp/docs/tutorial.dox
@@ -405,7 +405,7 @@ applications or applications with unusual IO requirements.
 We'll look at the \ref engine/helloworld.cpp example step-by-step to see how it differs
 from the container \ref helloworld.cpp version.
 
-First we include the `proton::io::socket_engine` class, which is a `proton::connection_engine`
+First we include the `proton::io::socket::engine` class, which is a `proton::connection_engine`
 that uses socket IO.
 
 \skipline proton/io.hpp
@@ -417,7 +417,7 @@ engine's' connection:
 \skip on_start
 \until }
 
-Our `main` function only differs in that it creates and runs a `socket_engine`
+Our `main` function only differs in that it creates and runs a `socket::engine`
 instead of a `container`.
 
 \skip main

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index da7f806..eb4e598 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -37,6 +37,10 @@ namespace proton {
 
 class handler;
 
+namespace io {
+class connection_engine;
+}
+
 /// A connection to a remote AMQP peer.
 class
 PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, public endpoint {
@@ -123,7 +127,7 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
     void host(const std::string& h);
 
     friend class connection_context;
-    friend class connection_engine;
+    friend class io::connection_engine;
     friend class connection_options;
     friend class connector;
     friend class container_impl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/connection_engine.hpp
deleted file mode 100644
index 0b1a947..0000000
--- a/proton-c/bindings/cpp/include/proton/connection_engine.hpp
+++ /dev/null
@@ -1,168 +0,0 @@
-#ifndef CONNECTION_ENGINE_HPP
-#define CONNECTION_ENGINE_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/connection.hpp"
-#include "proton/connection_options.hpp"
-#include "proton/error.hpp"
-#include "proton/export.hpp"
-#include "proton/pn_unique_ptr.hpp"
-#include "proton/types.hpp"
-
-#include <cstddef>
-#include <utility>
-#include <string>
-
-namespace proton {
-
-class connection_engine_context;
-class handler;
-class connection;
-
-// TODO aconway 2016-01-23: doc contrast with container.
-
-/// An interface for connection-oriented IO integration.  A
-/// connection_engine manages a single AMQP connection.  It is useful
-/// for integrating AMQP into an existing IO framework.
-///
-/// The engine provides a simple "bytes-in/bytes-out" interface. Incoming AMQP
-/// bytes from any kind of data connection are fed into the engine and processed
-/// to dispatch events to a proton::handler.  The resulting AMQP output data is
-/// available from the engine and can sent back over the connection.
-///
-/// The engine does no IO of its own. It assumes a two-way flow of bytes over
-/// some externally-managed "connection". The "connection" could be a socket
-/// managed by select, poll, epoll or some other mechanism, or it could be
-/// something else such as an RDMA connection, a shared-memory buffer or a Unix
-/// pipe.
-///
-/// The application is coded the same way for engine or container: you implement
-/// proton::handler. Handlers attached to an engine will receive transport,
-/// connection, session, link and message events. They will not receive reactor,
-/// selectable or timer events, the engine assumes those are managed externally.
-///
-/// THREAD SAFETY: A single engine instance cannot be called concurrently, but
-/// different engine instances can be processed concurrently in separate threads.
-class
-PN_CPP_CLASS_EXTERN connection_engine {
-  public:
-    class container {
-      public:
-        /// Create a container with id.  Default to random UUID if id
-        /// == "".
-        PN_CPP_EXTERN container(const std::string &id = "");
-        PN_CPP_EXTERN ~container();
-
-        /// Return the container-id
-        PN_CPP_EXTERN std::string id() const;
-
-        /// Make options to configure a new engine, using the default options.
-        ///
-        /// Call this once for each new engine as the options include a generated unique link_prefix.
-        /// You can modify the configuration before creating the engine but you should not
-        /// modify the container_id or link_prefix.
-        PN_CPP_EXTERN connection_options make_options();
-
-        /// Set the default options to be used for connection engines.
-        /// The container will set the container_id and link_prefix when make_options is called.
-        PN_CPP_EXTERN void options(const connection_options&);
-
-      private:
-        class impl;
-        internal::pn_unique_ptr<impl> impl_;
-    };
-    /// Create a connection engine that dispatches to handler.
-    PN_CPP_EXTERN connection_engine(handler&, const connection_options& = no_opts);
-
-    PN_CPP_EXTERN virtual ~connection_engine();
-
-    /// Return the number of bytes that the engine is currently ready to read.
-    PN_CPP_EXTERN size_t can_read() const;
-
-    /// Return the number of bytes that the engine is currently ready to write.
-    PN_CPP_EXTERN size_t can_write() const;
-
-    /// Combine these flags with | to indicate read, write, both or neither
-    enum io_flag {
-        READ = 1,
-        WRITE = 2
-    };
-
-    /// Read, write and dispatch events.
-    ///
-    /// io_flags indicates whether to read, write, both or neither.
-    /// Dispatches all events generated by reading or writing.
-    /// Use closed() to check if the engine is closed after processing.
-    ///
-    /// @throw exceptions thrown by the engines handler or the IO adapter.
-    PN_CPP_EXTERN void process(int io_flags=READ|WRITE);
-
-    /// True if the engine is closed, meaning there are no further
-    /// events to process and close_io has been called.  Call
-    /// error_str() to get an error description.
-    PN_CPP_EXTERN bool closed() const;
-
-    /// Get the AMQP connection associated with this connection_engine.
-    PN_CPP_EXTERN class connection connection() const;
-
-    /// Thrown by io_read and io_write functions to indicate an error.
-    struct PN_CPP_CLASS_EXTERN io_error : public error {
-        PN_CPP_EXTERN explicit io_error(const std::string&); ///< Construct with message
-    };
-
-  protected:
-    /// Do a non-blocking read on the IO stream.
-    ///
-    ///@return pair(size, true) if size bytes were read.
-    /// size==0 means no data could be read without blocking, the stream is still open.
-    /// Returns pair(0, false) if the stream closed.
-    ///
-    ///@throw proton::connection_engine::io_error if there is a read error.
-    virtual std::pair<size_t, bool> io_read(char* buf, size_t max) = 0;
-
-    /// Do a non-blocking write of up to max bytes from buf.
-    ///
-    /// Return the number of byes written , 0 if no data could be written
-    /// without blocking.
-    ///
-    ///throw proton::connection_engine::io_error if there is a write error.
-    virtual size_t io_write(const char*, size_t) = 0;
-
-    /// Close the io, no more _io methods will be called after this is called.
-    virtual void io_close() = 0;
-
-    PN_CPP_EXTERN static const connection_options no_opts;
-
-  private:
-    connection_engine(const connection_engine&);
-    connection_engine& operator=(const connection_engine&);
-
-    void dispatch();
-    void try_read();
-    void try_write();
-
-    class connection connection_;
-    connection_engine_context* ctx_;
-};
-
-}
-
-#endif // CONNECTION_ENGINE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 1a22b73..d2efab9 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -40,6 +40,10 @@ namespace proton {
 class proton_handler;
 class connection;
 
+namespace io {
+class connection_engine;
+}
+
 /// Options for creating a connection.
 ///
 /// Options can be "chained" like this:
@@ -149,7 +153,7 @@ class connection_options {
 
     friend class container_impl;
     friend class connector;
-    friend class connection_engine;
+    friend class io::connection_engine;
     /// @endcond
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index 6ea11d4..3086037 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -32,6 +32,10 @@ class condition;
 class event;
 class messaging_adapter;
 
+namespace io {
+class connection_engine;
+}
+
 /// Callback functions for handling proton events.
 ///
 /// Subclass and override event-handling member functions.
@@ -131,7 +135,7 @@ PN_CPP_CLASS_EXTERN handler
     internal::pn_unique_ptr<messaging_adapter> messaging_adapter_;
 
     friend class container;
-    friend class connection_engine;
+    friend class io::connection_engine;
     friend class connection_options;
     friend class link_options;
     /// @endcond

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/io.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io.hpp b/proton-c/bindings/cpp/include/proton/io.hpp
deleted file mode 100644
index 9c63edb..0000000
--- a/proton-c/bindings/cpp/include/proton/io.hpp
+++ /dev/null
@@ -1,134 +0,0 @@
-#ifndef SOCKET_IO_HPP
-#define SOCKET_IO_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/connection_engine.hpp>
-#include <proton/url.hpp>
-
-namespace proton {
-
-///@details
-/// IO using sockets, file descriptors, or handles, for use with
-/// proton::connection_engine.
-///
-/// Note that you can use proton::connection_engine to communicate using AMQP
-/// over your own IO implementation or to integrate an existing IO framework of
-/// your choice, this implementation is provided as a convenience if sockets is
-/// sufficient for your needs.
-namespace io {
-
-/// @name Setup and teardown
-///
-/// Call proton::io::initialize before using any functions in the
-/// proton::io namespace.  Call proton::io::finalize when you are
-/// done.
-///
-/// You can call initialize/finalize more than once as long as they
-/// are in matching pairs. Use proton::io::guard to call
-/// initialize/finalize around a scope.
-///
-/// Note that on POSIX systems these are no-ops, but they are required
-/// for Windows.
-///
-/// @{
-
-/// Initialize the proton::io subsystem.
-PN_CPP_EXTERN void initialize();
-
-/// Finalize the proton::io subsystem.
-PN_CPP_EXTERN void finalize(); // nothrow
-
-/// Use to call io::initialize and io::finalize around a scope.
-struct guard {
-    guard() { initialize(); }
-    ~guard() { finalize(); }
-};
-
-/// @}
-
-/// An IO resource.
-typedef int64_t descriptor;
-
-/// An invalid descriptor.
-PN_CPP_EXTERN extern const descriptor INVALID_DESCRIPTOR;
-
-/// Return a string describing the most recent IO error.
-PN_CPP_EXTERN std::string error_str();
-
-/// Open a TCP connection to the host:port (port can be a service name or number) from a proton::url.
-PN_CPP_EXTERN descriptor connect(const proton::url&);
-
-/// Listening socket.
-class listener {
-  public:
-    /// Listen on host/port. Empty host means listen on all interfaces.
-    /// port can be a service name or number
-    PN_CPP_EXTERN listener(const std::string& host, const std::string& port);
-    PN_CPP_EXTERN ~listener();
-
-    /// Accept a connection. Return the descriptor, set host, port to the remote address.
-    /// port can be a service name or number.
-    PN_CPP_EXTERN descriptor accept(std::string& host, std::string& port);
-
-    /// Accept a connection, does not provide address info.
-    descriptor accept() { std::string dummy; return accept(dummy, dummy); }
-
-    /// Convert to descriptor
-    descriptor socket() const { return socket_; }
-
-  private:
-    guard guard_;
-    listener(const listener&);
-    listener& operator=(const listener&);
-    descriptor socket_;
-};
-
-/// A connection_engine for socket-based IO.
-class socket_engine : public connection_engine {
-  public:
-    /// Wrap an open socket. Sets non-blocking mode.
-    PN_CPP_EXTERN socket_engine(descriptor socket_, handler&, const connection_options& = no_opts);
-
-    /// Create socket engine connected to url.
-    PN_CPP_EXTERN socket_engine(const url&, handler&, const connection_options& = no_opts);
-
-    PN_CPP_EXTERN ~socket_engine();
-
-    /// Get the socket descriptor.
-    descriptor socket() const { return socket_; }
-
-    /// Start the engine.
-    PN_CPP_EXTERN void run();
-
-  protected:
-    PN_CPP_EXTERN std::pair<size_t, bool> io_read(char* buf, size_t max);
-    PN_CPP_EXTERN size_t io_write(const char*, size_t);
-    PN_CPP_EXTERN void io_close();
-
-  private:
-    void init();
-    guard guard_;
-    descriptor socket_;
-};
-
-}} // proton::io
-
-#endif // SOCKET_IO_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
new file mode 100644
index 0000000..3192d44
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
@@ -0,0 +1,182 @@
+#ifndef CONNECTION_ENGINE_HPP
+#define CONNECTION_ENGINE_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/condition.hpp"
+#include "proton/connection.hpp"
+#include "proton/connection_options.hpp"
+#include "proton/error.hpp"
+#include "proton/export.hpp"
+#include "proton/pn_unique_ptr.hpp"
+#include "proton/transport.hpp"
+#include "proton/types.hpp"
+
+#include <cstddef>
+#include <utility>
+#include <string>
+
+struct pn_collector_t;
+
+namespace proton {
+
+class handler;
+
+/// Contains classes to integrate proton into different IO and threading environments.
+namespace io {
+
+///@cond INTERNAL
+class connection_engine_context;
+///
+
+/// Pointer to a mutable memory region with a size.
+struct mutable_buffer {
+    char* data;
+    size_t size;
+
+    mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {}
+};
+
+/// Pointer to a const memory region with a size.
+struct const_buffer {
+    const char* data;
+    size_t size;
+
+    const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
+};
+
+/// A protocol engine to integrate AMQP into any IO or concurrency framework.
+///
+/// io::connection_engine manages a single proton::connection and dispatches
+/// events to a proton::handler. It does no IO of its own, but allows you to
+/// integrate AMQP protocol handling into any IO or concurrency framework.
+///
+/// The application is coded the same way as for the proton::container. The
+/// application implements a proton::handler to respond to transport,
+/// connection, session, link and message events. With a little care, the same
+/// handler classes can be used for both container and connection_engine, the
+/// \ref broker.cpp example illustrates this.
+///
+/// You need to write the IO code to read AMQP data to the read_buffer(). The
+/// engine parses the AMQP frames. dispatch() calls the appropriate functions on
+/// the applications proton::handler. You write output data from the engines
+/// write_buffer() to your IO.
+///
+/// The engine is not safe for concurrent use, but you can process different
+/// engines concurrently. A common pattern for high-performance servers is to
+/// serialize read/write activity per-connection and dispatch in a fixed-size
+/// thread pool.
+///
+/// The engine is designed to work with a classic reactor (e.g. select, poll,
+/// epoll) or an async-request driven proactor (e.g. windows completion ports,
+/// boost.asio, libuv etc.)
+///
+class
+PN_CPP_CLASS_EXTERN connection_engine {
+  public:
+    // TODO aconway 2016-03-18: this will change
+    class container {
+      public:
+        /// Create a container with id.  Default to random UUID.
+        PN_CPP_EXTERN container(const std::string &id = "");
+        PN_CPP_EXTERN ~container();
+
+        /// Return the container-id
+        PN_CPP_EXTERN std::string id() const;
+
+        /// Make options to configure a new engine, using the default options.
+        ///
+        /// Call this once for each new engine as the options include a generated unique link_prefix.
+        /// You can modify the configuration before creating the engine but you should not
+        /// modify the container_id or link_prefix.
+        PN_CPP_EXTERN connection_options make_options();
+
+        /// Set the default options to be used for connection engines.
+        /// The container will set the container_id and link_prefix when make_options is called.
+        PN_CPP_EXTERN void options(const connection_options&);
+
+      private:
+        class impl;
+        internal::pn_unique_ptr<impl> impl_;
+    };
+
+    /// Create a connection engine that dispatches to handler.
+    PN_CPP_EXTERN connection_engine(handler&, const connection_options& = connection_options());
+
+    PN_CPP_EXTERN virtual ~connection_engine();
+
+    /// The engine's read buffer. Read data into this buffer then call read_done() when complete.
+    /// Returns mutable_buffer(0, 0) if the engine cannot currently read data.
+    /// Calling dispatch() may open up more buffer space.
+    PN_CPP_EXTERN mutable_buffer read_buffer();
+
+    /// Indicate that the first n bytes of read_buffer() have valid data.
+    /// This changes the buffer, call read_buffer() to get the updated buffer.
+    PN_CPP_EXTERN void read_done(size_t n);
+
+    /// Indicate that the read side of the transport is closed and no more data will be read.
+    PN_CPP_EXTERN void read_close();
+
+    /// The engine's write buffer. Write data from this buffer then call write_done()
+    /// Returns const_buffer(0, 0) if the engine has nothing to write.
+    /// Calling dispatch() may generate more data in the write buffer.
+    PN_CPP_EXTERN const_buffer write_buffer() const;
+
+    /// Indicate that the first n bytes of write_buffer() have been written successfully.
+    /// This changes the buffer, call write_buffer() to get the updated buffer.
+    PN_CPP_EXTERN void write_done(size_t n);
+
+    /// Indicate that the write side of the transport has closed and no more data will be written.
+    PN_CPP_EXTERN void write_close();
+
+    /// Indicate that the transport has closed with an error condition.
+    /// This calls both read_close() and write_close().
+    /// The error condition will be passed to handler::on_transport_error()
+    PN_CPP_EXTERN void close(const std::string& name, const std::string& description);
+
+    /// Dispatch all available events and call the corresponding \ref handler methods.
+    ///
+    /// Returns true if the engine is still active, false if it is finished and
+    /// can be destroyed. The engine is finished when either of the following is
+    /// true:
+    ///
+    /// - both read_close() and write_close() have been called, no more IO is possible.
+    /// - The AMQP connection() is closed AND write_buffer() is empty.
+    ///
+    /// May expand the read_buffer() and/or the write_buffer().
+    ///
+    /// @throw any exceptions thrown by the \ref handler.
+    PN_CPP_EXTERN bool dispatch();
+
+    /// Get the AMQP connection associated with this connection_engine.
+    PN_CPP_EXTERN proton::connection connection() const;
+
+  private:
+    connection_engine(const connection_engine&);
+    connection_engine& operator=(const connection_engine&);
+
+    proton::handler& handler_;
+    proton::connection connection_;
+    proton::transport transport_;
+    proton::internal::pn_ptr<pn_collector_t> collector_;
+};
+}}
+
+#endif // CONNECTION_ENGINE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/io/socket.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/socket.hpp b/proton-c/bindings/cpp/include/proton/io/socket.hpp
new file mode 100644
index 0000000..a43c0ae
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/socket.hpp
@@ -0,0 +1,130 @@
+#ifndef PROTON_IO_IO_HPP
+#define PROTON_IO_IO_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/io/connection_engine.hpp>
+#include <proton/url.hpp>
+
+
+namespace proton {
+namespace io {
+namespace socket {
+
+struct
+PN_CPP_CLASS_EXTERN io_error : public proton::error {
+    PN_CPP_EXTERN explicit io_error(const std::string&); ///< Construct with message
+};
+
+/// @name Setup and teardown
+///
+/// Call initialize() before using any functions in the proton::io::socket
+/// namespace.  Call finalize() when you are done.
+///
+/// You can call initialize/finalize more than once as long as they are in
+/// matching pairs. Use \ref guard to call initialize/finalize around a scope.
+///
+/// Note that on POSIX systems these are no-ops, but they are required
+/// for Windows.
+///
+/// @{
+
+/// Initialize the proton::io subsystem.
+PN_CPP_EXTERN void initialize();
+
+/// Finalize the proton::io subsystem.
+PN_CPP_EXTERN void finalize(); // nothrow
+
+/// Use to call io::initialize and io::finalize around a scope.
+struct guard {
+    guard() { initialize(); }
+    ~guard() { finalize(); }
+};
+
+/// @}
+
+/// An IO resource.
+typedef int64_t descriptor;
+
+/// An invalid descriptor.
+PN_CPP_EXTERN extern const descriptor INVALID_DESCRIPTOR;
+
+/// Return a string describing the most recent IO error.
+PN_CPP_EXTERN std::string error_str();
+
+/// Open a TCP connection to the host:port (port can be a service name or number) from a proton::url.
+PN_CPP_EXTERN descriptor connect(const proton::url&);
+
+/// Listening socket.
+class listener {
+  public:
+    /// Listen on host/port. Empty host means listen on all interfaces.
+    /// port can be a service name or number
+    PN_CPP_EXTERN listener(const std::string& host, const std::string& port);
+    PN_CPP_EXTERN ~listener();
+
+    /// Accept a connection. Return the descriptor, set host, port to the remote address.
+    /// port can be a service name or number.
+    PN_CPP_EXTERN descriptor accept(std::string& host, std::string& port);
+
+    /// Accept a connection, does not provide address info.
+    descriptor accept() { std::string dummy; return accept(dummy, dummy); }
+
+    /// Convert to descriptor
+    descriptor socket() const { return socket_; }
+
+  private:
+    guard guard_;
+    listener(const listener&);
+    listener& operator=(const listener&);
+    descriptor socket_;
+};
+
+/// A \ref connection_engine with non-blocking socket IO.
+class engine : public connection_engine {
+  public:
+    /// Wrap an open socket. Sets non-blocking mode.
+    PN_CPP_EXTERN engine(descriptor socket_, handler&, const connection_options& = connection_options());
+
+    /// Create socket engine connected to url.
+    PN_CPP_EXTERN engine(const url&, handler&, const connection_options& = connection_options());
+
+    PN_CPP_EXTERN ~engine();
+
+    /// Run the engine until it closes
+    PN_CPP_EXTERN void run();
+
+    /// Non-blocking read from socket to engine
+    PN_CPP_EXTERN void read();
+
+    /// Non-blocking write from engine to socket
+    PN_CPP_EXTERN void write();
+
+    descriptor socket() const { return socket_; }
+
+  private:
+    void init();
+    guard guard_;
+    descriptor socket_;
+};
+
+}}}
+
+#endif  /*!PROTON_IO_IO_HPP*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/include/proton/transport.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/transport.hpp b/proton-c/bindings/cpp/include/proton/transport.hpp
index 9e32ac5..bebc974 100644
--- a/proton-c/bindings/cpp/include/proton/transport.hpp
+++ b/proton-c/bindings/cpp/include/proton/transport.hpp
@@ -34,6 +34,10 @@ class connection;
 class condition;
 class sasl;
 
+namespace io {
+class connection_engine;
+}
+
 class transport : public internal::object<pn_transport_t> {
     /// @cond INTERNAL
     transport(pn_transport_t* t) : internal::object<pn_transport_t>(t) {}
@@ -71,6 +75,7 @@ class transport : public internal::object<pn_transport_t> {
     friend class connection_options;
     friend class connector;
     friend class proton_event;
+    friend class io::connection_engine;
     /// @endcond
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_engine.cpp b/proton-c/bindings/cpp/src/connection_engine.cpp
deleted file mode 100644
index be9efeb..0000000
--- a/proton-c/bindings/cpp/src/connection_engine.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/connection_engine.hpp"
-#include "proton/error.hpp"
-#include "proton/handler.hpp"
-#include "proton/uuid.hpp"
-
-#include "contexts.hpp"
-#include "id_generator.hpp"
-#include "messaging_adapter.hpp"
-#include "messaging_event.hpp"
-#include "msg.hpp"
-#include "proton_bits.hpp"
-#include "proton_bits.hpp"
-
-#include <proton/connection.h>
-#include <proton/transport.h>
-#include <proton/event.h>
-
-#include <algorithm>
-
-#include <iosfwd>
-
-namespace proton {
-
-namespace {
-void set_error(connection_engine_context *ctx_, const std::string& reason) {
-    pn_condition_t *c = pn_transport_condition(ctx_->transport);
-    pn_condition_set_name(c, "io_error");
-    pn_condition_set_description(c, reason.c_str());
-}
-
-void close_transport(connection_engine_context *ctx_) {
-    if (pn_transport_pending(ctx_->transport) >= 0)
-        pn_transport_close_head(ctx_->transport);
-    if (pn_transport_capacity(ctx_->transport) >= 0)
-        pn_transport_close_tail(ctx_->transport);
-}
-
-std::string  make_id(const std::string s="") { return s.empty() ? uuid::random().str() : s; }
-}
-
-connection_engine::io_error::io_error(const std::string& msg) : error(msg) {}
-
-class connection_engine::container::impl {
-  public:
-    impl(const std::string s="") : id_(make_id(s)) {}
-
-    const std::string id_;
-    id_generator id_gen_;
-    connection_options options_;
-};
-
-connection_engine::container::container(const std::string& s) : impl_(new impl(s)) {}
-
-connection_engine::container::~container() {}
-
-std::string connection_engine::container::id() const { return impl_->id_; }
-
-connection_options connection_engine::container::make_options() {
-    connection_options opts = impl_->options_;
-    opts.container_id(id()).link_prefix(impl_->id_gen_.next()+"/");
-    return opts;
-}
-
-void connection_engine::container::options(const connection_options &opts) {
-    impl_->options_ = opts;
-}
-
-connection_engine::connection_engine(class handler &h, const connection_options& opts) {
-    connection_ = proton::connection(internal::take_ownership(pn_connection()).get());
-    internal::pn_ptr<pn_transport_t> transport = internal::take_ownership(pn_transport());
-    internal::pn_ptr<pn_collector_t> collector = internal::take_ownership(pn_collector());
-    if (!connection_ || !transport || !collector)
-        throw proton::error("engine create");
-    int err = pn_transport_bind(transport.get(), connection_.pn_object());
-    if (err)
-        throw error(msg() << "transport bind:" << pn_code(err));
-    pn_connection_collect(connection_.pn_object(), collector.get());
-
-    ctx_ = &connection_engine_context::get(connection_); // Creates context
-    ctx_->engine_handler = &h;
-    ctx_->transport = transport.release();
-    ctx_->collector = collector.release();
-    opts.apply(connection_);
-    // Provide defaults for connection_id and link_prefix if not set.
-    std::string cid = connection_.container_id();
-    if (cid.empty()) {
-        cid = make_id();
-        pn_connection_set_container(connection_.pn_object(), cid.c_str());
-    }
-    id_generator &link_gen = connection_context::get(connection_).link_gen;
-    if (link_gen.prefix().empty()) {
-        link_gen.prefix(make_id()+"/");
-    }
-}
-
-connection_engine::~connection_engine() {
-    pn_transport_unbind(ctx_->transport);
-    pn_transport_free(ctx_->transport);
-    internal::pn_ptr<pn_connection_t> c(connection_.pn_object());
-    connection_ = proton::connection();
-    pn_connection_free(c.release());
-    pn_collector_free(ctx_->collector);
-}
-
-void connection_engine::process(int flags) {
-    if (closed()) return;
-    if (flags & WRITE) try_write();
-    dispatch();
-    if (flags & READ) try_read();
-    dispatch();
-
-    if (connection_.closed() && !closed()) {
-        dispatch();
-        while (can_write()) {
-            try_write(); // Flush final data.
-        }
-        // no transport errors.
-        close_transport(ctx_);
-    }
-    if (closed()) {
-        pn_transport_unbind(ctx_->transport);
-        dispatch();
-        try { io_close(); } catch(const io_error&) {} // Tell the IO to close.
-    }
-}
-
-void connection_engine::dispatch() {
-    proton_handler& h = *ctx_->engine_handler->messaging_adapter_;
-    pn_collector_t* c = ctx_->collector;
-    for (pn_event_t *e = pn_collector_peek(c); e; e = pn_collector_peek(c)) {
-        if (pn_event_type(e) == PN_CONNECTION_INIT) {
-            // Make the messaging_adapter issue a START event.
-            proton_event(e, PN_REACTOR_INIT, 0).dispatch(h);
-        }
-        proton_event(e, pn_event_type(e), 0).dispatch(h);
-        pn_collector_pop(c);
-    }
-}
-
-size_t connection_engine::can_read() const {
-    return std::max(ssize_t(0), pn_transport_capacity(ctx_->transport));
-}
-
-void connection_engine::try_read() {
-    size_t max = can_read();
-    if (max == 0) return;
-    try {
-        std::pair<size_t, bool> r = io_read(pn_transport_tail(ctx_->transport), max);
-        if (r.second) {
-            if (r.first > max)
-                throw io_error(msg() << "read invalid size: " << r.first << ">" << max);
-            pn_transport_process(ctx_->transport, r.first);
-        } else {
-            pn_transport_close_tail(ctx_->transport);
-        }
-    } catch (const io_error& e) {
-        set_error(ctx_, e.what());
-        pn_transport_close_tail(ctx_->transport);
-    }
-}
-
-size_t connection_engine::can_write() const {
-    return std::max(ssize_t(0), pn_transport_pending(ctx_->transport));
-}
-
-void connection_engine::try_write() {
-    size_t max = can_write();
-    if (max == 0) return;
-    try {
-        size_t n = io_write(pn_transport_head(ctx_->transport), max);
-        if (n > max) {
-            throw io_error(msg() << "write invalid size: " << n << " > " << max);
-        }
-        pn_transport_pop(ctx_->transport, n);
-    } catch (const io_error& e) {
-        set_error(ctx_, e.what());
-        pn_transport_close_head(ctx_->transport);
-    }
-}
-
-bool connection_engine::closed() const {
-    return pn_transport_closed(ctx_->transport);
-}
-
-connection connection_engine::connection() const { return connection_.pn_object(); }
-
-const connection_options connection_engine::no_opts;
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp
index 03271a1..a60c1fa 100644
--- a/proton-c/bindings/cpp/src/contexts.hpp
+++ b/proton-c/bindings/cpp/src/contexts.hpp
@@ -26,7 +26,7 @@
 #include "proton/message.hpp"
 #include "proton/connection.hpp"
 #include "proton/container.hpp"
-#include "proton/connection_engine.hpp"
+#include "proton/io/connection_engine.hpp"
 
 #include "id_generator.hpp"
 #include "proton_handler.hpp"
@@ -98,19 +98,6 @@ class connection_context : public context {
     static context::id id(const connection& c) { return id(c.pn_object()); }
 };
 
-// Connection context with information used by the connection_engine.
-class connection_engine_context : public connection_context {
-  public:
-    connection_engine_context() :  engine_handler(0), transport(0), collector(0) {}
-
-    class handler *engine_handler;
-    pn_transport_t  *transport;
-    pn_collector_t  *collector;
-    static connection_engine_context& get(const connection &c) {
-        return ref<connection_engine_context>(id(c));
-    }
-};
-
 void container_context(const reactor&, container&);
 
 class container_context {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/engine_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
index 74a7a6a..afae6c2 100644
--- a/proton-c/bindings/cpp/src/engine_test.cpp
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -20,7 +20,7 @@
 
 #include "test_bits.hpp"
 #include <proton/uuid.hpp>
-#include <proton/connection_engine.hpp>
+#include <proton/io/connection_engine.hpp>
 #include <proton/handler.hpp>
 #include <proton/event.hpp>
 #include <proton/types_fwd.hpp>
@@ -32,167 +32,209 @@
 #define override
 #endif
 
+using namespace proton::io;
 using namespace proton;
 using namespace test;
+using namespace std;
 
-// One end of an in-memory connection
-struct mem_pipe {
-    mem_pipe(std::deque<char>& r, std::deque<char>& w) : read(r), write(w) {}
-    std::deque<char>  &read, &write;
-};
+typedef std::deque<char> byte_stream;
 
-struct mem_queues : public std::pair<std::deque<char>, std::deque<char> > {
-    mem_pipe a() { return mem_pipe(first, second); }
-    mem_pipe b() { return mem_pipe(second, first); }
-};
+/// In memory connection_engine that reads and writes from byte_streams
+struct in_memory_engine : public connection_engine {
 
-// In memory connection_engine
-struct mem_engine : public connection_engine {
-    mem_pipe socket;
-    std::string read_error;
-    std::string write_error;
-
-    mem_engine(mem_pipe s, handler &h, const connection_options &opts)
-        : connection_engine(h, opts), socket(s) {}
-
-    std::pair<size_t, bool> io_read(char* buf, size_t size) override {
-        if (!read_error.empty()) throw io_error(read_error);
-        size = std::min(socket.read.size(), size);
-        copy(socket.read.begin(), socket.read.begin()+size, buf);
-        socket.read.erase(socket.read.begin(), socket.read.begin()+size);
-        return std::make_pair(size, true);
-    }
+    byte_stream& reads;
+    byte_stream& writes;
 
-    size_t io_write(const char* buf, size_t size) override {
-        if (!write_error.empty()) throw io_error(write_error);
-        socket.write.insert(socket.write.begin(), buf, buf+size);
-        return size;
-    }
+    in_memory_engine(byte_stream& rd, byte_stream& wr, handler &h,
+                     const connection_options &opts = connection_options()) :
+        connection_engine(h, opts), reads(rd), writes(wr) {}
 
-    void io_close() override {
-        read_error = write_error = "closed";
+    void do_read() {
+        mutable_buffer rbuf = read_buffer();
+        size_t size = std::min(reads.size(), rbuf.size);
+        if (size) {
+            copy(reads.begin(), reads.begin()+size, static_cast<char*>(rbuf.data));
+            read_done(size);
+            reads.erase(reads.begin(), reads.begin()+size);
+        }
     }
-};
 
-struct debug_handler : handler {
-    void on_unhandled(event& e) override {
-        std::cout << e.name() << std::endl;
+    void do_write() {
+        const_buffer wbuf = write_buffer();
+        if (wbuf.size) {
+            writes.insert(writes.begin(),
+                          static_cast<const char*>(wbuf.data),
+                          static_cast<const char*>(wbuf.data) + wbuf.size);
+            write_done(wbuf.size);
+        }
     }
-};
 
-struct record_handler : handler {
-    std::deque<std::string> events;
-    void on_unhandled(event& e) override {
-        events.push_back(e.name());
-    }
+    void process() { do_read(); do_write(); dispatch(); }
 };
 
-template <class HA=record_handler, class HB=record_handler> struct engine_pair {
+/// A pair of engines that talk to each other in-memory.
+struct engine_pair {
+    byte_stream ab, ba;
     connection_engine::container cont;
-    mem_queues queues;
-    HA ha;
-    HB hb;
-    mem_engine a, b;
-    engine_pair() : a(queues.a(), ha, cont.make_options()), b(queues.b(), hb, cont.make_options()) {}
-    engine_pair(const std::string& id)
-        : cont(id), a(queues.a(), ha, cont.make_options()), b(queues.b(), hb, cont.make_options())
-    {}
-    engine_pair(const connection_options &aopts, connection_options &bopts)
-        : a(queues.a(), ha, aopts), b(queues.b(), hb, bopts)
-    {}
 
-    void process() { a.process(); b.process(); }
-};
+    in_memory_engine a, b;
 
-void test_process_amqp() {
-    engine_pair<> e;
+    engine_pair(handler& ha, handler& hb,
+                const connection_options& ca = connection_options(),
+                const connection_options& cb = connection_options()) :
+        a(ba, ab, ha, ca), b(ab, ba, hb, cb) {}
 
-    e.a.process(connection_engine::READ); // Don't write unlesss writable
-    ASSERT(e.a.socket.write.empty());
-    e.a.process(connection_engine::WRITE);
-
-    std::string wrote(e.a.socket.write.begin(), e.a.socket.write.end());
-    e.a.process(connection_engine::WRITE);
-    ASSERT_EQUAL(8, wrote.size());
-    ASSERT_EQUAL("AMQP", wrote.substr(0,4));
+    void process() { a.process(); b.process(); }
+};
 
-    e.b.process();              // Read and write AMQP
-    ASSERT_EQUAL("AMQP", std::string(e.b.socket.write.begin(), e.b.socket.write.begin()+4));
-    ASSERT(e.b.socket.read.empty());
-    ASSERT(e.a.socket.write.empty());
-    ASSERT_EQUAL(many<std::string>() + "START", e.ha.events);
+template <class S> typename S::value_type quick_pop(S& s) {
+    ASSERT(!s.empty());
+    typename S::value_type x = s.front();
+    s.pop_front();
+    return x;
 }
 
-
-struct link_handler : public record_handler {
+/// A handler that records incoming endpoints, errors etc.
+struct record_handler : public handler {
     std::deque<proton::link> links;
+    std::deque<proton::session> sessions;
+    std::deque<std::string> errors;
+
     void on_link_open(event& e) override {
         links.push_back(e.link());
     }
 
-    proton::link pop() {
-        proton::link l;
-        if (!links.empty()) {
-            l = links.front();
-            links.pop_front();
-        }
-        return l;
+    void on_session_open(event& e) {
+        sessions.push_back(e.session());
+    }
+
+    void on_unhandled_error(event& e, const condition& c) {
+        errors.push_back(e.name() + "/" + c.what());
     }
 };
 
 void test_engine_prefix() {
     // Set container ID and prefix explicitly
-    engine_pair<link_handler, link_handler> e(
-        connection_options().container_id("a").link_prefix("x/"),
-        connection_options().container_id("b").link_prefix("y/"));
+    record_handler ha, hb;
+    engine_pair e(ha, hb,
+                  connection_options().container_id("a").link_prefix("x/"),
+                  connection_options().container_id("b").link_prefix("y/"));
     e.a.connection().open();
     ASSERT_EQUAL("a", e.a.connection().container_id());
     e.b.connection().open();
     ASSERT_EQUAL("b", e.b.connection().container_id());
 
-    e.a.connection().open_sender("");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("x/1", e.ha.pop().name());
-    ASSERT_EQUAL("x/1", e.hb.pop().name());
+    e.a.connection().open_sender("x");
+    while (ha.links.empty() || hb.links.empty()) e.process();
+    ASSERT_EQUAL("x/1", quick_pop(ha.links).name());
+    ASSERT_EQUAL("x/1", quick_pop(hb.links).name());
 
     e.a.connection().open_receiver("");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("x/2", e.ha.pop().name());
-    ASSERT_EQUAL("x/2", e.hb.pop().name());
+    while (ha.links.empty() || hb.links.empty()) e.process();
+    ASSERT_EQUAL("x/2", quick_pop(ha.links).name());
+    ASSERT_EQUAL("x/2", quick_pop(hb.links).name());
 
     e.b.connection().open_receiver("");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("y/1", e.ha.pop().name());
-    ASSERT_EQUAL("y/1", e.hb.pop().name());
+    while (ha.links.empty() || hb.links.empty()) e.process();
+    ASSERT_EQUAL("y/1", quick_pop(ha.links).name());
+    ASSERT_EQUAL("y/1", quick_pop(hb.links).name());
 }
 
 void test_container_prefix() {
     /// Let the container set the options.
-    engine_pair<link_handler, link_handler> e;
+    record_handler ha, hb;
+    connection_engine::container ca("a"), cb("b");
+    engine_pair e(ha, hb, ca.make_options(), cb.make_options());
+
+    ASSERT_EQUAL("a", e.a.connection().container_id());
+    ASSERT_EQUAL("b", e.b.connection().container_id());
+
     e.a.connection().open();
+    sender s = e.a.connection().open_sender("x");
+    ASSERT_EQUAL("1/1", s.name());
 
-    e.a.connection().open_sender("x");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("1/1", e.ha.pop().name());
-    ASSERT_EQUAL("1/1", e.hb.pop().name());
+    while (ha.links.empty() || hb.links.empty()) e.process();
+
+    ASSERT_EQUAL("1/1", quick_pop(ha.links).name());
+    ASSERT_EQUAL("1/1", quick_pop(hb.links).name());
 
     e.a.connection().open_receiver("y");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("1/2", e.ha.pop().name());
-    ASSERT_EQUAL("1/2", e.hb.pop().name());
+    while (ha.links.empty() || hb.links.empty()) e.process();
+    ASSERT_EQUAL("1/2", quick_pop(ha.links).name());
+    ASSERT_EQUAL("1/2", quick_pop(hb.links).name());
 
-    e.b.connection().open_receiver("z");
-    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
-    ASSERT_EQUAL("2/1", e.ha.pop().name());
-    ASSERT_EQUAL("2/1", e.hb.pop().name());
+    // Open a second connection in each container, make sure links have different IDs.
+    record_handler ha2, hb2;
+    engine_pair e2(ha2, hb2, ca.make_options(), cb.make_options());
 
-    // TODO aconway 2016-01-22: check we respect name set in linkn-options.
+    ASSERT_EQUAL("a", e2.a.connection().container_id());
+    ASSERT_EQUAL("b", e2.b.connection().container_id());
+
+    e2.b.connection().open();
+    receiver r = e2.b.connection().open_receiver("z");
+    ASSERT_EQUAL("2/1", r.name());
+
+    while (ha2.links.empty() || hb2.links.empty()) e2.process();
+
+    ASSERT_EQUAL("2/1", quick_pop(ha2.links).name());
+    ASSERT_EQUAL("2/1", quick_pop(hb2.links).name());
 };
 
+void test_endpoint_close() {
+    // Make sure conditions are sent to the remote end.
+
+    // FIXME aconway 2016-03-22: re-enable these tests when we can set error conditions.
+
+    // record_handler ha, hb;
+    // engine_pair e(ha, hb);
+    // e.a.connection().open();
+    // e.a.connection().open_sender("x");
+    // e.a.connection().open_receiver("y");
+    // while (ha.links.size() < 2 || hb.links.size() < 2) e.process();
+    // link ax = quick_pop(ha.links), ay = quick_pop(ha.links);
+    // link bx = quick_pop(hb.links), by = quick_pop(hb.links);
+
+    // // Close a link
+    // ax.close(condition("err", "foo bar"));
+    // while (!(bx.state() & endpoint::REMOTE_CLOSED)) e.process();
+    // condition c = bx.remote_condition();
+    // ASSERT_EQUAL("err", c.name());
+    // ASSERT_EQUAL("foo bar", c.description());
+    // ASSERT_EQUAL("err: foo bar", ax.local_condition().what());
+
+    // // Close a link with an empty condition
+    // ay.close(condition());
+    // while (!(by.state() & endpoint::REMOTE_CLOSED)) e.process();
+    // ASSERT(by.remote_condition().empty());
+
+    // // Close a connection
+    // connection ca = e.a.connection(), cb = e.b.connection();
+    // ca.close(condition("conn", "bad connection"));
+    // while (!cb.closed()) e.process();
+    // ASSERT_EQUAL("conn: bad connection", cb.remote_condition().what());
+}
+
+void test_transport_close() {
+    // Make sure conditions are sent to the remote end.
+    record_handler ha, hb;
+    engine_pair e(ha, hb);
+    e.a.connection().open();
+    while (!e.a.connection().state() & endpoint::REMOTE_ACTIVE) e.process();
+
+    e.a.close("oops", "engine failure");
+    // Closed but we still have output data to flush so a.dispatch() is true.
+    ASSERT(e.a.dispatch());
+    while (!e.b.connection().closed()) e.process();
+    ASSERT_EQUAL(1, hb.errors.size());
+    ASSERT_EQUAL("trasport_error/oops: engine failure", hb.errors.front());
+    ASSERT_EQUAL("oops", e.b.connection().remote_condition().name());
+    ASSERT_EQUAL("engine failure", e.b.connection().remote_condition().description());
+}
+
 int main(int, char**) {
     int failed = 0;
-    RUN_TEST(failed, test_process_amqp());
     RUN_TEST(failed, test_engine_prefix());
     RUN_TEST(failed, test_container_prefix());
+    RUN_TEST(failed, test_endpoint_close());
     return failed;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
new file mode 100644
index 0000000..5d8e5cc
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/io/connection_engine.hpp"
+#include "proton/error.hpp"
+#include "proton/handler.hpp"
+#include "proton/uuid.hpp"
+
+#include "contexts.hpp"
+#include "id_generator.hpp"
+#include "messaging_adapter.hpp"
+#include "messaging_event.hpp"
+#include "msg.hpp"
+#include "proton_bits.hpp"
+#include "proton_bits.hpp"
+
+#include <proton/connection.h>
+#include <proton/transport.h>
+#include <proton/event.h>
+
+#include <algorithm>
+
+#include <iosfwd>
+
+#include <assert.h>
+
+namespace proton {
+namespace io {
+
+namespace {
+std::string  make_id(const std::string s="") {
+    return s.empty() ? uuid::random().str() : s;
+}
+}
+
+class connection_engine::container::impl {
+  public:
+    impl(const std::string s="") : id_(make_id(s)) {}
+
+    const std::string id_;
+    id_generator id_gen_;
+    connection_options options_;
+};
+
+connection_engine::container::container(const std::string& s) : impl_(new impl(s)) {}
+
+connection_engine::container::~container() {}
+
+std::string connection_engine::container::id() const { return impl_->id_; }
+
+connection_options connection_engine::container::make_options() {
+    connection_options opts = impl_->options_;
+    opts.container_id(id()).link_prefix(impl_->id_gen_.next()+"/");
+    return opts;
+}
+
+void connection_engine::container::options(const connection_options &opts) {
+    impl_->options_ = opts;
+}
+
+connection_engine::connection_engine(class handler &h, const connection_options& opts) :
+    handler_(h),
+    connection_(internal::take_ownership(pn_connection()).get()),
+    transport_(internal::take_ownership(pn_transport()).get()),
+    collector_(internal::take_ownership(pn_collector()).get())
+{
+    if (!connection_ || !transport_ || !collector_)
+        throw proton::error("engine create");
+    transport_.bind(connection_);
+    pn_connection_collect(connection_.pn_object(), collector_.get());
+    opts.apply(connection_);
+
+    // Provide defaults for connection_id and link_prefix if not set.
+    std::string cid = connection_.container_id();
+    if (cid.empty()) {
+        cid = make_id();
+        pn_connection_set_container(connection_.pn_object(), cid.c_str());
+    }
+    id_generator &link_gen = connection_context::get(connection_).link_gen;
+    if (link_gen.prefix().empty()) {
+        link_gen.prefix(make_id()+"/");
+    }
+}
+
+connection_engine::~connection_engine() {
+    transport_.unbind();
+    pn_collector_free(collector_.release()); // Break cycle with connection_
+}
+
+bool connection_engine::dispatch() {
+    proton_handler& h = *handler_.messaging_adapter_;
+    for (pn_event_t *e = pn_collector_peek(collector_.get());
+         e;
+         e = pn_collector_peek(collector_.get()))
+    {
+        switch (pn_event_type(e)) {
+          case PN_CONNECTION_INIT:
+            // FIXME aconway 2016-03-21: don't use START in connection handlers
+            // reserve it for containers.
+            proton_event(e, PN_REACTOR_INIT, 0).dispatch(h);
+            break;
+          default:
+            break;
+        }
+        proton_event(e, pn_event_type(e), 0).dispatch(h);
+        pn_collector_pop(collector_.get());
+    }
+    return !(pn_transport_closed(transport_.pn_object()) ||
+          (connection().closed() && write_buffer().size == 0));
+}
+
+mutable_buffer connection_engine::read_buffer() {
+    ssize_t cap = pn_transport_capacity(transport_.pn_object());
+    if (cap > 0)
+        return mutable_buffer(pn_transport_tail(transport_.pn_object()), cap);
+    else
+        return mutable_buffer(0, 0);
+}
+
+void connection_engine::read_done(size_t n) {
+    pn_transport_process(transport_.pn_object(), n);
+}
+
+void connection_engine::read_close() {
+    pn_transport_close_tail(transport_.pn_object());
+}
+
+const_buffer connection_engine::write_buffer() const {
+    ssize_t pending = pn_transport_pending(transport_.pn_object());
+    if (pending > 0)
+        return const_buffer(pn_transport_head(transport_.pn_object()), pending);
+    else
+        return const_buffer(0, 0);
+}
+
+void connection_engine::write_done(size_t n) {
+    pn_transport_pop(transport_.pn_object(), n);
+}
+
+void connection_engine::write_close() {
+    pn_transport_close_head(transport_.pn_object());
+}
+
+void connection_engine::close(const std::string& name, const std::string& description) {
+    pn_condition_t* c = pn_transport_condition(transport_.pn_object());
+    pn_condition_set_name(c, name.c_str());
+    pn_condition_set_description(c, description.c_str());
+    read_close();
+    write_close();
+}
+
+proton::connection connection_engine::connection() const {
+    return connection_;
+}
+
+}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/222574ed/proton-c/bindings/cpp/src/io/posix/socket.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/posix/socket.cpp b/proton-c/bindings/cpp/src/io/posix/socket.cpp
new file mode 100644
index 0000000..656a837
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/posix/socket.cpp
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "msg.hpp"
+
+#include <proton/io/socket.hpp>
+#include <proton/url.hpp>
+
+#include <errno.h>
+#include <string.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace proton {
+namespace io {
+namespace socket {
+
+io_error::io_error(const std::string& s) : error(s) {}
+
+const descriptor INVALID_DESCRIPTOR = -1;
+
+std::string error_str() {
+    char buf[512] = "Unknown error";
+#ifdef _GNU_SOURCE
+    // GNU strerror_r returns the message
+    return ::strerror_r(errno, buf, sizeof(buf));
+#else
+    // POSIX strerror_r doesn't return the buffer
+    ::strerror_r(errno, buf, sizeof(buf));
+    return std::string(buf)
+#endif
+}
+
+namespace {
+
+template <class T> T check(T result, const std::string& msg=std::string()) {
+    if (result < 0) throw io_error(msg + error_str());
+    return result;
+}
+
+void gai_check(int result, const std::string& msg="") {
+    if (result) throw io_error(msg + gai_strerror(result));
+}
+
+}
+
+void engine::init() {
+    check(fcntl(socket_, F_SETFL, fcntl(socket_, F_GETFL, 0) | O_NONBLOCK), "set nonblock: ");
+}
+
+engine::engine(descriptor fd, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(fd)
+{
+    init();
+}
+
+engine::engine(const url& u, handler& h, const connection_options& opts)
+    : connection_engine(h, opts), socket_(connect(u))
+{
+    init();
+}
+
+engine::~engine() {}
+
+void engine::read() {
+    mutable_buffer rbuf = read_buffer();
+    if (rbuf.size > 0) {
+        ssize_t n = ::read(socket_, rbuf.data, rbuf.size);
+        if (n > 0)
+            read_done(n);
+        else if (n == 0)
+            read_close();
+        else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
+            close("io_error", error_str());
+    }
+}
+
+void engine::write() {
+    const_buffer wbuf = write_buffer();
+    if (wbuf.size > 0) {
+        ssize_t n = ::write(socket_, wbuf.data, wbuf.size);
+        if (n > 0)
+            write_done(n);
+        else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+            close("io_error", error_str());
+        }
+    }
+}
+
+void engine::run() {
+    while (dispatch()) {
+        fd_set rd, wr;
+        FD_ZERO(&rd);
+        if (read_buffer().size)
+            FD_SET(socket_, &rd);
+        FD_ZERO(&wr);
+        if (write_buffer().size)
+            FD_SET(socket_, &wr);
+        int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
+        if (n < 0) {
+            close("select: ", error_str());
+            break;
+        }
+        if (FD_ISSET(socket_, &rd)) {
+            read();
+        }
+        if (FD_ISSET(socket_, &wr))
+            write();
+    }
+    ::close(socket_);
+}
+
+namespace {
+struct auto_addrinfo {
+    struct addrinfo *ptr;
+    auto_addrinfo() : ptr(0) {}
+    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
+    addrinfo* operator->() const { return ptr; }
+};
+}
+
+descriptor connect(const proton::url& u) {
+    descriptor fd = INVALID_DESCRIPTOR;
+    try{
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(u.host().empty() ? 0 : u.host().c_str(),
+                                u.port().empty() ? 0 : u.port().c_str(),
+                                0, &addr.ptr), u.str()+": ");
+        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect: ");
+        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
+        return fd;
+    } catch (...) {
+        if (fd >= 0) close(fd);
+        throw;
+    }
+}
+
+listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_DESCRIPTOR) {
+    try {
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
+                  "listener address invalid: ");
+        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listen: ");
+        int yes = 1;
+        check(setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt: ");
+        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "bind: ");
+        check(::listen(socket_, 32), "listen: ");
+    } catch (...) {
+        if (socket_ >= 0) close(socket_);
+        throw;
+    }
+}
+
+listener::~listener() { ::close(socket_); }
+
+descriptor listener::accept(std::string& host_str, std::string& port_str) {
+    struct sockaddr_storage addr;
+    socklen_t size = sizeof(addr);
+    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
+    char host[NI_MAXHOST], port[NI_MAXSERV];
+    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
+                          host, sizeof(host), port, sizeof(port), 0),
+              "accept invalid remote address: ");
+    host_str = host;
+    port_str = port;
+    return fd;
+}
+
+// Empty stubs, only needed on windows.
+void initialize() {}
+void finalize() {}
+
+}}}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org