You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/04/27 16:54:57 UTC

[1/3] qpid-proton git commit: PROTON-1046: C++ multi-threaded controller and improved broker example

Repository: qpid-proton
Updated Branches:
  refs/heads/master b53a684e7 -> deccf354a


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/sender.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp
index 6f13abe..43c0747 100644
--- a/proton-c/bindings/cpp/include/proton/sender.hpp
+++ b/proton-c/bindings/cpp/include/proton/sender.hpp
@@ -25,6 +25,7 @@
 #include "proton/export.hpp"
 #include "proton/link.hpp"
 #include "proton/message.hpp"
+#include "proton/tracker.hpp"
 
 #include "proton/types.h"
 #include <string>
@@ -33,8 +34,6 @@ struct pn_connection_t;
 
 namespace proton {
 
-class tracker;
-
 /// A link for sending messages.
 class
 PN_CPP_CLASS_EXTERN sender : public internal::link

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/work_queue.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp b/proton-c/bindings/cpp/include/proton/work_queue.hpp
new file mode 100644
index 0000000..1fb84ce
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp
@@ -0,0 +1,75 @@
+#ifndef PROTON_WORK_QUEUE_HPP
+#define PROTON_WORK_QUEUE_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+pp * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/handler.hpp>
+#include <proton/connection_options.hpp>
+
+#include <functional>
+#include <memory>
+
+namespace proton {
+
+class connection;
+
+/// A work_queue takes work (in the form of function objects) that will be be
+/// serialized with other activity on a connection. Typically the work is a call
+/// to user-defined member functions on the handler(s) associated with a
+/// connection, which will be called serialized with respect to
+/// proton::handler::on_* event functions.
+///
+class work_queue : public std::enable_shared_from_this<work_queue> {
+  public:
+    work_queue(const work_queue&) = delete;
+    virtual ~work_queue() {}
+
+    /// Get the work_queue associated with a connection.
+    /// @throw proton::error if this is not a controller-managed connection.
+    PN_CPP_EXTERN static std::shared_ptr<work_queue> get(const proton::connection&);
+
+    /// push a function object on the queue to be invoked in a safely serialized
+    /// away.
+    ///
+    /// @return true if `f()` was pushed and will be called. False if the
+    /// work_queue is already closed and f() will never be called.
+    ///
+    /// Note 1: On returning true, the application can rely on f() being called
+    /// eventually. However f() should check the state when it executes as
+    /// links, sessions or even the connection may have closed by the time f()
+    /// is executed.
+    ///
+    /// Note 2: You must not push() in a handler or work_queue function on the
+    /// *same connection* as the work_queue you are pushing to. That could cause
+    /// a deadlock.
+    ///
+    virtual bool push(std::function<void()>) = 0;
+
+    /// Get the controller associated with this work_queue.
+    virtual class controller& controller() const = 0;
+
+  protected:
+    work_queue() {}
+};
+
+}
+
+
+#endif // PROTON_WORK_QUEUE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/connection_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp
index 4a0956b..19a494a 100644
--- a/proton-c/bindings/cpp/src/connection_options.cpp
+++ b/proton-c/bindings/cpp/src/connection_options.cpp
@@ -145,9 +145,11 @@ class connection_options::impl {
 };
 
 connection_options::connection_options() : impl_(new impl()) {}
+
 connection_options::connection_options(const connection_options& x) : impl_(new impl()) {
     *this = x;
 }
+
 connection_options::~connection_options() {}
 
 connection_options& connection_options::operator=(const connection_options& x) {
@@ -155,7 +157,16 @@ connection_options& connection_options::operator=(const connection_options& x) {
     return *this;
 }
 
-void connection_options::update(const connection_options& x) { impl_->update(*x.impl_); }
+connection_options& connection_options::update(const connection_options& x) {
+    impl_->update(*x.impl_);
+    return *this;
+}
+
+connection_options connection_options::update(const connection_options& x) const {
+    connection_options copy(*this);
+    copy.update(x);
+    return copy;
+}
 
 connection_options& connection_options::handler(class handler *h) { impl_->handler = h->messaging_adapter_.get(); return *this; }
 connection_options& connection_options::max_frame_size(uint32_t n) { impl_->max_frame_size = n; return *this; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp
index a60c1fa..0aa539e 100644
--- a/proton-c/bindings/cpp/src/contexts.hpp
+++ b/proton-c/bindings/cpp/src/contexts.hpp
@@ -40,6 +40,8 @@ struct pn_acceptor_t;
 namespace proton {
 
 class proton_handler;
+class work_queue;
+
 
 // Base class for C++ classes that are used as proton contexts.
 // Contexts are pn_objects managed by pn reference counts, the C++ value is allocated in-place.
@@ -81,12 +83,13 @@ class context {
 // Connection context used by all connections.
 class connection_context : public context {
   public:
-    connection_context() : default_session(0) {}
+    connection_context() : default_session(0), work_queue(0) {}
 
     // Used by all connections
     pn_session_t *default_session; // Owned by connection.
     message event_message;      // re-used by messaging_adapter for performance.
     id_generator link_gen;      // Link name generator.
+    class work_queue* work_queue; // Work queue if this is proton::controller connection.
 
     internal::pn_unique_ptr<proton_handler> handler;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/controller.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/controller.cpp b/proton-c/bindings/cpp/src/controller.cpp
new file mode 100644
index 0000000..73403c2
--- /dev/null
+++ b/proton-c/bindings/cpp/src/controller.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "contexts.hpp"
+
+#include <proton/error.hpp>
+#include <proton/controller.hpp>
+#include <proton/work_queue.hpp>
+
+#include <proton/io/default_controller.hpp>
+
+#include <utility>
+#include <memory>
+
+static proton::io::default_controller::make_fn make_default_controller;
+
+namespace proton {
+
+std::unique_ptr<controller> controller::create() {
+    if (!make_default_controller)
+        throw error("no default controller");
+    return make_default_controller();
+}
+
+controller& controller::get(const proton::connection& c) {
+    return work_queue::get(c)->controller();
+}
+
+std::shared_ptr<work_queue> work_queue::get(const proton::connection& c) {
+    work_queue* wq = connection_context::get(c).work_queue;
+    if (!wq)
+        throw proton::error("connection has no controller");
+    return wq->shared_from_this();
+}
+
+namespace io {
+// Register a default controller factory.
+default_controller::default_controller(default_controller::make_fn f) {
+    make_default_controller = f;
+}
+} // namespace io
+
+} // namespace proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/engine_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
index d94abdc..a6fd71e 100644
--- a/proton-c/bindings/cpp/src/engine_test.cpp
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -74,8 +74,6 @@ struct in_memory_engine : public connection_engine {
 /// A pair of engines that talk to each other in-memory.
 struct engine_pair {
     byte_stream ab, ba;
-    connection_engine::container cont;
-
     in_memory_engine a, b;
 
     engine_pair(handler& ha, handler& hb,
@@ -152,49 +150,7 @@ void test_engine_prefix() {
     ASSERT_EQUAL("y/1", quick_pop(hb.receivers).name());
 }
 
-void test_container_prefix() {
-    /// Let the container set the options.
-    record_handler ha, hb;
-    connection_engine::container ca("a"), cb("b");
-    engine_pair e(ha, hb, ca.make_options(), cb.make_options());
-
-    ASSERT_EQUAL("a", e.a.connection().container_id());
-    ASSERT_EQUAL("b", e.b.connection().container_id());
-
-    e.a.connection().open();
-    sender s = e.a.connection().open_sender("x");
-    ASSERT_EQUAL("1/1", s.name());
-
-    while (ha.senders.empty() || hb.receivers.empty()) e.process();
-
-    ASSERT_EQUAL("1/1", quick_pop(ha.senders).name());
-    ASSERT_EQUAL("1/1", quick_pop(hb.receivers).name());
-
-    e.a.connection().open_receiver("y");
-    while (ha.receivers.empty() || hb.senders.empty()) e.process();
-    ASSERT_EQUAL("1/2", quick_pop(ha.receivers).name());
-    ASSERT_EQUAL("1/2", quick_pop(hb.senders).name());
-
-    // Open a second connection in each container, make sure links have different IDs.
-    record_handler ha2, hb2;
-    engine_pair e2(ha2, hb2, ca.make_options(), cb.make_options());
-
-    ASSERT_EQUAL("a", e2.a.connection().container_id());
-    ASSERT_EQUAL("b", e2.b.connection().container_id());
-
-    e2.b.connection().open();
-    receiver r = e2.b.connection().open_receiver("z");
-    ASSERT_EQUAL("2/1", r.name());
-
-    while (ha2.senders.empty() || hb2.receivers.empty()) e2.process();
-
-    ASSERT_EQUAL("2/1", quick_pop(ha2.senders).name());
-    ASSERT_EQUAL("2/1", quick_pop(hb2.receivers).name());
-};
-
 void test_endpoint_close() {
-    // Make sure conditions are sent to the remote end.
-
     record_handler ha, hb;
     engine_pair e(ha, hb);
     e.a.connection().open();
@@ -246,7 +202,6 @@ void test_transport_close() {
 int main(int, char**) {
     int failed = 0;
     RUN_TEST(failed, test_engine_prefix());
-    RUN_TEST(failed, test_container_prefix());
     RUN_TEST(failed, test_endpoint_close());
     RUN_TEST(failed, test_transport_close());
     return failed;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
index e2a9356..ffef3b7 100644
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -42,38 +42,7 @@
 namespace proton {
 namespace io {
 
-namespace {
-std::string  make_id(const std::string s="") {
-    return s.empty() ? uuid::random().str() : s;
-}
-}
-
-class connection_engine::container::impl {
-  public:
-    impl(const std::string s="") : id_(make_id(s)) {}
-
-    const std::string id_;
-    id_generator id_gen_;
-    connection_options options_;
-};
-
-connection_engine::container::container(const std::string& s) : impl_(new impl(s)) {}
-
-connection_engine::container::~container() {}
-
-std::string connection_engine::container::id() const { return impl_->id_; }
-
-connection_options connection_engine::container::make_options() {
-    connection_options opts = impl_->options_;
-    opts.container_id(id()).link_prefix(impl_->id_gen_.next()+"/");
-    return opts;
-}
-
-void connection_engine::container::options(const connection_options &opts) {
-    impl_->options_ = opts;
-}
-
-connection_engine::connection_engine(class handler &h, const connection_options& opts) :
+connection_engine::connection_engine(class handler &h, const connection_options& opts):
     handler_(h),
     connection_(internal::take_ownership(pn_connection()).get()),
     transport_(internal::take_ownership(pn_transport()).get()),
@@ -85,16 +54,12 @@ connection_engine::connection_engine(class handler &h, const connection_options&
     pn_connection_collect(connection_.pn_object(), collector_.get());
     opts.apply(connection_);
 
-    // Provide defaults for connection_id and link_prefix if not set.
-    std::string cid = connection_.container_id();
-    if (cid.empty()) {
-        cid = make_id();
-        pn_connection_set_container(connection_.pn_object(), cid.c_str());
-    }
+    // Provide local random defaults for connection_id and link_prefix if not by opts.
+    if (connection_.container_id().empty())
+        pn_connection_set_container(connection_.pn_object(), uuid::random().str().c_str());
     id_generator &link_gen = connection_context::get(connection_).link_gen;
-    if (link_gen.prefix().empty()) {
-        link_gen.prefix(make_id()+"/");
-    }
+    if (link_gen.prefix().empty())
+        link_gen.prefix(uuid::random().str()+"/");
 }
 
 connection_engine::~connection_engine() {
@@ -108,11 +73,15 @@ bool connection_engine::dispatch() {
          e;
          e = pn_collector_peek(collector_.get()))
     {
-        proton_event(e, 0).dispatch(h);
+        proton_event pe(e, 0);
+        try {
+            pe.dispatch(h);
+        } catch (const std::exception& e) {
+            close(error_condition("exception", e.what()));
+        }
         pn_collector_pop(collector_.get());
     }
-    return !(pn_transport_closed(transport_.pn_object()) ||
-          (connection().closed() && write_buffer().size == 0));
+    return !(pn_transport_closed(transport_.pn_object()));
 }
 
 mutable_buffer connection_engine::read_buffer() {
@@ -124,7 +93,8 @@ mutable_buffer connection_engine::read_buffer() {
 }
 
 void connection_engine::read_done(size_t n) {
-    pn_transport_process(transport_.pn_object(), n);
+    if (n > 0)
+        pn_transport_process(transport_.pn_object(), n);
 }
 
 void connection_engine::read_close() {
@@ -140,7 +110,8 @@ const_buffer connection_engine::write_buffer() const {
 }
 
 void connection_engine::write_done(size_t n) {
-    pn_transport_pop(transport_.pn_object(), n);
+    if (n > 0)
+        pn_transport_pop(transport_.pn_object(), n);
 }
 
 void connection_engine::write_close() {
@@ -161,4 +132,8 @@ proton::transport connection_engine::transport() const {
     return transport_;
 }
 
+void connection_engine::work_queue(class work_queue* wq) {
+    connection_context::get(connection()).work_queue = wq;
+}
+
 }}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/io/posix/socket.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/posix/socket.cpp b/proton-c/bindings/cpp/src/io/posix/socket.cpp
deleted file mode 100644
index 204b530..0000000
--- a/proton-c/bindings/cpp/src/io/posix/socket.cpp
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "msg.hpp"
-
-#include <proton/error_condition.hpp>
-#include <proton/io/socket.hpp>
-#include <proton/url.hpp>
-
-#include <errno.h>
-#include <string.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <sys/socket.h>
-#include <sys/select.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-namespace proton {
-namespace io {
-namespace socket {
-
-io_error::io_error(const std::string& s) : error(s) {}
-
-const descriptor INVALID_DESCRIPTOR = -1;
-
-std::string error_str() {
-    char buf[512] = "Unknown error";
-#ifdef _GNU_SOURCE
-    // GNU strerror_r returns the message
-    return ::strerror_r(errno, buf, sizeof(buf));
-#else
-    // POSIX strerror_r doesn't return the buffer
-    ::strerror_r(errno, buf, sizeof(buf));
-    return std::string(buf)
-#endif
-}
-
-namespace {
-
-template <class T> T check(T result, const std::string& msg=std::string()) {
-    if (result < 0) throw io_error(msg + error_str());
-    return result;
-}
-
-void gai_check(int result, const std::string& msg="") {
-    if (result) throw io_error(msg + gai_strerror(result));
-}
-
-}
-
-void engine::init() {
-    check(fcntl(socket_, F_SETFL, fcntl(socket_, F_GETFL, 0) | O_NONBLOCK), "set nonblock: ");
-}
-
-engine::engine(descriptor fd, handler& h, const connection_options &opts)
-    : connection_engine(h, opts), socket_(fd)
-{
-    init();
-}
-
-engine::engine(const url& u, handler& h, const connection_options& opts)
-    : connection_engine(h, opts), socket_(connect(u))
-{
-    init();
-    connection().open();
-}
-
-engine::~engine() {}
-
-void engine::read() {
-    mutable_buffer rbuf = read_buffer();
-    if (rbuf.size > 0) {
-        ssize_t n = ::read(socket_, rbuf.data, rbuf.size);
-        if (n > 0)
-            read_done(n);
-        else if (n == 0)
-            read_close();
-        else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
-            close(error_condition("io_error", error_str()));
-    }
-}
-
-void engine::write() {
-    const_buffer wbuf = write_buffer();
-    if (wbuf.size > 0) {
-        ssize_t n = ::write(socket_, wbuf.data, wbuf.size);
-        if (n > 0)
-            write_done(n);
-        else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
-            close(error_condition("io_error", error_str()));
-        }
-    }
-}
-
-void engine::run() {
-    while (dispatch()) {
-        fd_set rd, wr;
-        FD_ZERO(&rd);
-        if (read_buffer().size)
-            FD_SET(socket_, &rd);
-        FD_ZERO(&wr);
-        if (write_buffer().size)
-            FD_SET(socket_, &wr);
-        int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
-        if (n < 0) {
-            close(error_condition("select: ", error_str()));
-            break;
-        }
-        if (FD_ISSET(socket_, &rd)) {
-            read();
-        }
-        if (FD_ISSET(socket_, &wr))
-            write();
-    }
-    ::close(socket_);
-}
-
-namespace {
-struct auto_addrinfo {
-    struct addrinfo *ptr;
-    auto_addrinfo() : ptr(0) {}
-    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
-    addrinfo* operator->() const { return ptr; }
-};
-}
-
-descriptor connect(const proton::url& u) {
-    descriptor fd = INVALID_DESCRIPTOR;
-    try{
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(u.host().empty() ? 0 : u.host().c_str(),
-                                u.port().empty() ? 0 : u.port().c_str(),
-                                0, &addr.ptr), u.str()+": ");
-        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect: ");
-        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
-        return fd;
-    } catch (...) {
-        if (fd >= 0) close(fd);
-        throw;
-    }
-}
-
-listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_DESCRIPTOR) {
-    try {
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
-                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
-                  "listener address invalid: ");
-        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listen: ");
-        int yes = 1;
-        check(setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt: ");
-        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "bind: ");
-        check(::listen(socket_, 32), "listen: ");
-    } catch (...) {
-        if (socket_ >= 0) close(socket_);
-        throw;
-    }
-}
-
-listener::~listener() { ::close(socket_); }
-
-descriptor listener::accept(std::string& host_str, std::string& port_str) {
-    struct sockaddr_storage addr;
-    socklen_t size = sizeof(addr);
-    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
-    char host[NI_MAXHOST], port[NI_MAXSERV];
-    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
-                          host, sizeof(host), port, sizeof(port), 0),
-              "accept invalid remote address: ");
-    host_str = host;
-    port_str = port;
-    return fd;
-}
-
-// Empty stubs, only needed on windows.
-void initialize() {}
-void finalize() {}
-
-}}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/io/windows/socket.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/windows/socket.cpp b/proton-c/bindings/cpp/src/io/windows/socket.cpp
deleted file mode 100644
index f312525..0000000
--- a/proton-c/bindings/cpp/src/io/windows/socket.cpp
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "msg.hpp"
-
-#include <proton/io/socket.hpp>
-#include <proton/url.hpp>
-
-#define FD_SETSIZE 2048
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-#if _WIN32_WINNT < 0x0501
-#error "Proton requires Windows API support for XP or later."
-#endif
-#include <winsock2.h>
-#include <mswsock.h>
-#include <Ws2tcpip.h>
-
-#include <ctype.h>
-#include <errno.h>
-#include <stdio.h>
-#include <assert.h>
-
-namespace proton {
-namespace io {
-namespace socket {
-
-const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET;
-
-std::string error_str() {
-    HRESULT code = WSAGetLastError();
-    char err[1024] = {0};
-    FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
-                  FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
-    return err;
-}
-
-io_error::io_error(const std::string& s) : error(s) {}
-
-namespace {
-
-template <class T> T check(T result, const std::string& msg=std::string()) {
-    if (result == SOCKET_ERROR)
-        throw io_error(msg + error_str());
-    return result;
-}
-
-void gai_check(int result, const std::string& msg="") {
-    if (result)
-        throw io_error(msg + gai_strerror(result));
-}
-
-} // namespace
-
-void initialize() {
-    WSADATA unused;
-    check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2
-}
-
-void finalize() {
-    WSACleanup();
-}
-
-void engine::init() {
-    u_long nonblock = 1;
-    check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: ");
-}
-
-engine::engine(descriptor fd, handler& h, const connection_options &opts)
-    : connection_engine(h, opts), socket_(fd)
-{
-    init();
-}
-
-engine::engine(const url& u, handler& h, const connection_options &opts)
-    : connection_engine(h, opts), socket_(connect(u))
-{
-    init();
-    connection().open();
-}
-
-engine::~engine() {}
-
-void engine::read() {
-    mutable_buffer rbuf = read_buffer();
-    if (rbuf.size > 0) {
-        int n = ::recv(socket_, rbuf.data, rbuf.size, 0);
-        if (n > 0)
-            read_done(n);
-        else if (n == 0)
-            read_close();
-        else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
-            close(error_condition("io_error", error_str()));
-    }
-}
-
-void engine::write() {
-    const_buffer wbuf = write_buffer();
-    if (wbuf.size > 0) {
-    int n = ::send(socket_, wbuf.data, wbuf.size, 0);
-    if (n > 0)
-        write_done(n);
-    else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
-        close(error_condition("io_error", error_str()));
-    }
-}
-
-void engine::run() {
-    while (dispatch()) {
-        fd_set rd, wr;
-        FD_ZERO(&rd);
-        if (read_buffer().size)
-            FD_SET(socket_, &rd);
-        FD_ZERO(&wr);
-        if (write_buffer().size)
-            FD_SET(socket_, &wr);
-        int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
-        if (n < 0) {
-            close(error_condition("select: ", error_str()));
-            break;
-        }
-        if (FD_ISSET(socket_, &rd)) {
-            read();
-        }
-        if (FD_ISSET(socket_, &wr))
-            write();
-    }
-    ::closesocket(socket_);
-}
-
-namespace {
-struct auto_addrinfo {
-    struct addrinfo *ptr;
-    auto_addrinfo() : ptr(0) {}
-    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
-    addrinfo* operator->() const { return ptr; }
-};
-
-static const char *amqp_service(const char *port) {
-  // Help older Windows to know about amqp[s] ports
-  if (port) {
-    if (!strcmp("amqp", port)) return "5672";
-    if (!strcmp("amqps", port)) return "5671";
-  }
-  return port;
-}
-}
-
-
-descriptor connect(const proton::url& u) {
-    // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
-    std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host();
-    descriptor fd = INVALID_SOCKET;
-    try{
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
-                                amqp_service(u.port().empty() ? 0 : u.port().c_str()),
-                                0, &addr.ptr),
-                  "connect address invalid: ");
-        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: ");
-        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
-        return fd;
-    } catch (...) {
-        if (fd != INVALID_SOCKET) ::closesocket(fd);
-        throw;
-    }
-}
-
-listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET) {
-    try {
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
-                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
-                  "listener address invalid: ");
-        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: ");
-        bool yes = true;
-        check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes, sizeof(yes)), "setsockopt: ");
-        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: ");
-        check(::listen(socket_, 32), "listener listen: ");
-    } catch (...) {
-        if (socket_ != INVALID_SOCKET) ::closesocket(socket_);
-        throw;
-    }
-}
-
-listener::~listener() { ::closesocket(socket_); }
-
-descriptor listener::accept(std::string& host_str, std::string& port_str) {
-    struct sockaddr_storage addr;
-    socklen_t size = sizeof(addr);
-    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
-    char host[NI_MAXHOST], port[NI_MAXSERV];
-    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
-                          host, sizeof(host), port, sizeof(port), 0),
-              "accept invalid remote address: ");
-    host_str = host;
-    port_str = port;
-    return fd;
-}
-
-}}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index 1981726..a1ba250 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -68,7 +68,10 @@ void messaging_adapter::on_link_flow(proton_event &pe) {
     pn_event_t *pne = pe.pn_event();
     pn_link_t *lnk = pn_event_link(pne);
     sender s(lnk);
-    if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0) {
+    int state = pn_link_state(lnk);
+    if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0 &&
+        (state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE))
+    {
         // create on_message extended event
         delegate_.on_sendable(s);
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/tests/tools/apps/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/tools/apps/cpp/CMakeLists.txt b/tests/tools/apps/cpp/CMakeLists.txt
index 0c120f2..2bc1bc5 100644
--- a/tests/tools/apps/cpp/CMakeLists.txt
+++ b/tests/tools/apps/cpp/CMakeLists.txt
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-include_directories("${CMAKE_SOURCE_DIR}/examples/cpp")
+include_directories("${CMAKE_SOURCE_DIR}/examples/cpp" "${CMAKE_SOURCE_DIR}/examples/cpp/lib")
 add_executable(reactor_send_cpp reactor_send.cpp)
 
 target_link_libraries(reactor_send_cpp qpid-proton qpid-proton-cpp)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/tests/tools/apps/cpp/reactor_send.cpp
----------------------------------------------------------------------
diff --git a/tests/tools/apps/cpp/reactor_send.cpp b/tests/tools/apps/cpp/reactor_send.cpp
index a3dc003..d4045b4 100644
--- a/tests/tools/apps/cpp/reactor_send.cpp
+++ b/tests/tools/apps/cpp/reactor_send.cpp
@@ -114,7 +114,7 @@ int main(int argc, char **argv) {
     int message_count = 10;
     int message_size = 100;
     bool replying = false;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
     opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
     opts.add_value(message_count, 'c', "messages", "send COUNT messages", "COUNT");
     opts.add_value(message_size, 'b', "bytes", "send binary messages BYTES long", "BYTES");
@@ -124,7 +124,7 @@ int main(int argc, char **argv) {
         reactor_send send(address, message_count, message_size, replying);
         proton::container(send).run();
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-proton git commit: PROTON-1046: C++ multi-threaded controller and improved broker example

Posted by ac...@apache.org.
PROTON-1046: C++ multi-threaded controller and improved broker example

A complete portable multi-threaded API for proton that can be implemented on an
threading/IO platform.

API:
- proton::controller: A multi-threaded alternative to the proton::container.
- proton::work_queue: async functions serialized per-connection.

Examples:
- mt/epoll_controller.hpp: controller/work_queue implemented using native Linux epoll.
- mt/broker.cpp: multi-threaded broker, portable over any controller implementation.
  - illustrates multi-threading, use of work_queue, remote shutdown

TODO:
- Examples and implementations for non-Linux platforms.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/deccf354
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/deccf354
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/deccf354

Branch: refs/heads/master
Commit: deccf354a653e2106f40cdd59df9b67b74911e8b
Parents: b53a684
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 31 17:12:18 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Apr 27 10:39:59 2016 -0400

----------------------------------------------------------------------
 config.sh.in                                    |   2 +-
 examples/cpp/CMakeLists.txt                     |  25 +-
 examples/cpp/README.dox                         |  99 ++--
 examples/cpp/broker.cpp                         |   4 +-
 examples/cpp/client.cpp                         |   4 +-
 examples/cpp/direct_recv.cpp                    |   4 +-
 examples/cpp/direct_send.cpp                    |   4 +-
 examples/cpp/engine/CMakeLists.txt              |  37 --
 examples/cpp/engine/broker.cpp                  | 176 -------
 examples/cpp/engine/client.cpp                  | 103 ----
 examples/cpp/engine/direct_recv.cpp             |  79 ---
 examples/cpp/engine/direct_send.cpp             |  91 ----
 examples/cpp/engine/helloworld.cpp              |  68 ---
 examples/cpp/engine/options.hpp                 | 173 -------
 examples/cpp/engine/server.cpp                  |  90 ----
 examples/cpp/engine/simple_recv.cpp             |  85 ---
 examples/cpp/engine/simple_send.cpp             |  93 ----
 examples/cpp/example/socket_windows.cpp         | 218 ++++++++
 examples/cpp/example_test.py                    | 106 ++--
 examples/cpp/mt/broker.cpp                      | 280 ++++++++++
 examples/cpp/mt/epoll_controller.cpp            | 517 +++++++++++++++++++
 examples/cpp/options.hpp                        |   2 +
 examples/cpp/recurring_timer.cpp                |   4 +-
 examples/cpp/server.cpp                         |   4 +-
 examples/cpp/server_direct.cpp                  |   4 +-
 examples/cpp/simple_recv.cpp                    |   4 +-
 examples/cpp/simple_send.cpp                    |   4 +-
 examples/cpp/tutorial.dox                       | 403 +++++++++++++++
 proton-c/bindings/cpp/CMakeLists.txt            |  13 +-
 proton-c/bindings/cpp/cpp.cmake                 |   3 +
 proton-c/bindings/cpp/docs/mainpage.md          | 152 +++---
 proton-c/bindings/cpp/docs/mt_page.md           |  21 +
 proton-c/bindings/cpp/docs/tutorial.dox         | 428 ---------------
 proton-c/bindings/cpp/docs/user.doxygen.in      |   3 +-
 .../cpp/include/proton/connection_options.hpp   |   9 +-
 .../bindings/cpp/include/proton/controller.hpp  | 118 +++++
 proton-c/bindings/cpp/include/proton/error.hpp  |   7 +-
 .../bindings/cpp/include/proton/handler.hpp     |  12 +
 .../cpp/include/proton/io/connection_engine.hpp |  88 ++--
 .../include/proton/io/default_controller.hpp    |  47 ++
 .../bindings/cpp/include/proton/io/socket.hpp   | 130 -----
 proton-c/bindings/cpp/include/proton/sender.hpp |   3 +-
 .../bindings/cpp/include/proton/work_queue.hpp  |  75 +++
 .../bindings/cpp/src/connection_options.cpp     |  13 +-
 proton-c/bindings/cpp/src/contexts.hpp          |   5 +-
 proton-c/bindings/cpp/src/controller.cpp        |  59 +++
 proton-c/bindings/cpp/src/engine_test.cpp       |  45 --
 .../bindings/cpp/src/io/connection_engine.cpp   |  67 +--
 proton-c/bindings/cpp/src/io/posix/socket.cpp   | 196 -------
 proton-c/bindings/cpp/src/io/windows/socket.cpp | 218 --------
 proton-c/bindings/cpp/src/messaging_adapter.cpp |   5 +-
 tests/tools/apps/cpp/CMakeLists.txt             |   2 +-
 tests/tools/apps/cpp/reactor_send.cpp           |   4 +-
 53 files changed, 2054 insertions(+), 2352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 744ddb3..5eb779b 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -73,7 +73,7 @@ export LD_LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LD_LIBRARY_PATH)"
 export PATH="$(merge_paths $PATH $PROTON_BUILD/tests/tools/apps/c $PROTON_HOME/tests/tools/apps/python $PROTON_HOME/tests/python)"
 
 # can the test harness use valgrind?
-if [[ -x "$(type -p valgrind)" ]] ; then
+if [[ -x "$(type -p valgrind)" && "@ENABLE_VALGRIND" == "ON" ]] ; then
     export VALGRIND=$(type -p valgrind)
 fi
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 4f6b742..3a81718 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -20,7 +20,10 @@
 find_package(ProtonCpp REQUIRED)
 
 include_directories(${ProtonCpp_INCLUDE_DIRS})
+link_libraries(${ProtonCpp_LIBRARIES})
+add_compile_options(${CXX_WARNING_FLAGS})
 
+# Single-threaded examples.
 foreach(example
     broker
     helloworld
@@ -40,12 +43,9 @@ foreach(example
     ssl_client_cert
     encode_decode)
   add_executable(${example} ${example}.cpp)
-  target_link_libraries(${example} ${ProtonCpp_LIBRARIES})
-  set_source_files_properties(${example}.cpp PROPERTIES COMPILE_FLAGS "${CXX_WARNING_FLAGS}")
 endforeach()
 
-add_subdirectory(engine)
-
+# Python test runner
 set(env_py ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py)
 
 function(set_test_path dir)
@@ -61,7 +61,16 @@ set_test_path("$<TARGET_FILE_DIR:broker>")
 add_test(NAME cpp_container_example_test
   COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ContainerExampleTest)
 
-set_test_path("$<TARGET_FILE_DIR:engine-broker>")
-
-add_test(NAME cpp_engine_example_test
-  COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ConnectionEngineExampleTest)
+# TODO aconway 2016-04-26: need portable MT and IO examples.
+if(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND BUILD_CPP_MT)
+  set(controller_src mt/epoll_controller.cpp)
+  foreach(example
+      broker
+      )
+    add_executable(mt_${example} mt/${example}.cpp ${controller_src})
+    target_link_libraries(mt_${example} pthread)
+    set_target_properties(mt_${example} PROPERTIES CXX_STANDARD 11)
+  endforeach()
+  add_test(NAME cpp_mt_example_test
+    COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v MtBrokerTest)
+endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/README.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 1e78774..d545366 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -1,15 +1,22 @@
-// Examples overview.
+// C++ examples list (doxygen format)
 //
-// For a better overview, see the tutorial in the generated documentation.
-//
-// In your build directory do:
+// For a tutorial-style description of the examples see tutorial.dox.
+// To build the full HTML tutorial and documentation, in your build directory do:
 //
 //     make docs-cpp
 //
 // then open proton-c/bindings/cpp/docs/html/tutorial.html in your browser.
 
-// DEVELOPER NOTE: if you are adding or modifying examples you should keep this
-// file and ../proton-c/bindings/cpp/docs/tutorial.hpp up to date.
+// DEVELOPER NOTE: if you add or modify examples, please add/update a short
+// description below and (if appropriate) extend/update tutorial.dox.
+
+/** example sub directory
+
+The example sub-directory has utilities classes to make the example simpler,
+these classes are not directly related to the use of proton so are in a separate
+`example` directory and namespace.
+
+*/
 
 /** @example helloworld.cpp
 
@@ -46,7 +53,7 @@ on 127.0.0.1:5672. Simply prints out the body of received messages.
 /** @example direct_send.cpp
 
 Accepts an incoming connection and then sends like `simple_send`.  You can
-connect directly to `direct_send` *without* a broker using \ref simple_recv.cpp.
+connect directly to `direct_send` *without* a broker using @ref simple_recv.cpp.
 Make sure to stop the broker first or use a different port for `direct_send`.
 
 */
@@ -54,7 +61,7 @@ Make sure to stop the broker first or use a different port for `direct_send`.
 /** @example direct_recv.cpp
 
 Accepts an incoming connection and then receives like `simple_recv`.  You can
-connect directly to `direct_recv` *without* a broker using \ref simple_send.cpp.
+connect directly to `direct_recv` *without* a broker using @ref simple_send.cpp.
 Make sure to stop the broker first or use a different port for `direct_recv`.
 
 */
@@ -108,9 +115,6 @@ automatically when a client tries to send or subscribe. This file contains
 the `queue` class that queues messages and the `broker_handler` class
 that manages queues and links and transfers messages to/from clients.
 
-Examples \ref broker.cpp and \ref engine/broker.cpp use this same
-broker logic but show different ways to run it in a server application.
-
 */
 
 /** @example broker.cpp
@@ -120,79 +124,40 @@ to run other examples that reqiure an intermediary, or you can use any AMQP 1.0
 broker. This broker creates queues automatically when a client tries to send or
 subscribe.
 
-Uses the broker logic from \ref broker.hpp, the same logic as the
-`proton::connection_engine` broker example \ref engine/broker.cpp.
-
 */
 
-//////////////// connection_engine examples.
+/** @example mt/epoll_controller.cpp
 
-/** \example engine/helloworld.cpp
+An example implementation of the proton::mt::controller API that shows how to
+use the prton::io::connection_engine SPI to adapt the proton API to native
+IO. In this case using a multi-threaded Linux epoll poller as the implementation.
 
-`proton::connection_engine` example to send a "Hello World" message to
-itself. Compare with the corresponding `proton::container` example \ref
-helloworld.cpp.
+__Requires C++11__
 
 */
 
-/** \example engine/simple_send.cpp
+/** @example mt/broker.cpp
 
-`proton::connection_engine` example of sending a fixed number of messages and
-tracking their (asynchronous) acknowledgement. Messages are sent through the
-'examples' node on an intermediary accessible on 127.0.0.1:5672.
+A multi-threaded broker, using the proton::mt extensions. This broker is
+portable over any implementation of the proton::mt API, see @ref
+mt/epoll_controller.cpp for an example.
 
-*/
-
-/** \example engine/simple_recv.cpp
-
-`proton::connection_engine` example that subscribes to the 'examples' node and prints
- the body of received messages.
+__Requires C++11__
 
 */
 
-/** \example engine/direct_send.cpp
+/** @example mt/simple_send.cpp
 
-`proton::connection_engine` example accepts an incoming connection and then
-sends like `simple_send`.  You can connect directly to `direct_send` *without* a
-broker using \ref simple_recv.cpp.  Make sure to stop the broker first or use a
-different port for `direct_send`.
+A multi-threaded sender client. Sends messages concurrently to multiple addresses.
 
-*/
-
-/** \example engine/direct_recv.cpp
-
-`proton::connection_engine` example accepts an incoming connection and then
-receives like `simple_recv`.  You can connect directly to `direct_recv`
-*without* a broker using \ref simple_send.cpp.  Make sure to stop the broker
-first or use a different port for `direct_recv`.
+__Requires C++11__
 
 */
 
-/** \example engine/client.cpp
+/** @example mt/simple_recv.cpp
 
-`proton::connection_engine` client for request-response example. Sends requests and
-prints out responses. Requires an intermediary that supports the AMQP 1.0
-dynamic nodes on which the responses are received. The requests are sent through
-the 'examples' node.
+A multi-threaded receiver client. Receives messages concurrently to multiple addresses.
 
-*/
+__Requires C++11__
 
-/** \example engine/server.cpp
-
-`proton::connection_engine` server for request-response example, that receives
-requests via the examples node, converts the body to uppercase and sends the
-result back to the indicated reply address.
-
-*/
-
-/** \example engine/broker.cpp
-
-A simple, single-threaded broker using the `proton::container`. You can use this
-to run other examples that reqiure an intermediary, or you can use any AMQP 1.0
-broker. This broker creates queues automatically when a client tries to send or
-subscribe.
-
-Uses the broker logic from \ref broker.hpp, the same logic as the
-proton::container` broker example \ref broker.cpp.
-
-*/
+*/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp
index 37839c6..a19997f 100644
--- a/examples/cpp/broker.cpp
+++ b/examples/cpp/broker.cpp
@@ -61,7 +61,7 @@ class broker {
 
 int main(int argc, char **argv) {
     proton::url url("0.0.0.0");
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(url, 'a', "address", "listen on URL", "URL");
 
@@ -72,7 +72,7 @@ int main(int argc, char **argv) {
         proton::container(b.handler()).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp
index 0c38ac6..494294e 100644
--- a/examples/cpp/client.cpp
+++ b/examples/cpp/client.cpp
@@ -80,7 +80,7 @@ class client : public proton::handler {
 
 int main(int argc, char **argv) {
     proton::url url("127.0.0.1:5672/examples");
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(url, 'a', "address", "connect and send to URL", "URL");
 
@@ -97,7 +97,7 @@ int main(int argc, char **argv) {
         proton::container(c).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/direct_recv.cpp b/examples/cpp/direct_recv.cpp
index f999869..76bbaf9 100644
--- a/examples/cpp/direct_recv.cpp
+++ b/examples/cpp/direct_recv.cpp
@@ -72,7 +72,7 @@ class direct_recv : public proton::handler {
 int main(int argc, char **argv) {
     std::string address("127.0.0.1:5672/examples");
     int message_count = 100;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "listen and receive on URL", "URL");
     opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
@@ -84,7 +84,7 @@ int main(int argc, char **argv) {
         proton::container(recv).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/direct_send.cpp b/examples/cpp/direct_send.cpp
index 0b63ec5..860acc4 100644
--- a/examples/cpp/direct_send.cpp
+++ b/examples/cpp/direct_send.cpp
@@ -82,7 +82,7 @@ class simple_send : public proton::handler {
 int main(int argc, char **argv) {
     std::string address("127.0.0.1:5672/examples");
     int message_count = 100;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
     
     opts.add_value(address, 'a', "address", "listen and send on URL", "URL");
     opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
@@ -94,7 +94,7 @@ int main(int argc, char **argv) {
         proton::container(send).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/CMakeLists.txt b/examples/cpp/engine/CMakeLists.txt
deleted file mode 100644
index bafa20c..0000000
--- a/examples/cpp/engine/CMakeLists.txt
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(ProtonCpp REQUIRED)
-
-include_directories(${ProtonCpp_INCLUDE_DIRS})
-
-foreach(example
-    broker
-    helloworld
-    simple_recv
-    simple_send
-    direct_recv
-    direct_send
-    client
-    server)
-  add_executable(engine-${example} ${example}.cpp ${extra_source})
-  target_link_libraries(engine-${example} ${ProtonCpp_LIBRARIES})
-  set_source_files_properties(engine-${example}.cpp PROPERTIES COMPILE_FLAGS "${CXX_WARNING_FLAGS}")
-  set_target_properties(engine-${example} PROPERTIES OUTPUT_NAME ${example})
-endforeach()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/broker.cpp b/examples/cpp/engine/broker.cpp
deleted file mode 100644
index bfe84fc..0000000
--- a/examples/cpp/engine/broker.cpp
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "../options.hpp"
-#include "../broker.hpp"
-
-#include <iostream>
-
-#ifndef WIN32                   // TODO aconway 2016-03-23: windows broker example
-#include <proton/io/socket.hpp>
-#include <sys/select.h>
-#include <set>
-
-template <class T> T check(T result, const std::string& msg="io_error: ") {
-    if (result < 0)
-        throw proton::io::socket::io_error(msg + proton::io::socket::error_str());
-    return result;
-}
-
-void fd_set_if(bool on, int fd, fd_set *fds);
-
-class broker {
-    typedef std::set<proton::io::socket::engine*> engines;
-
-    queues queues_;
-    broker_handler handler_;
-    proton::io::connection_engine::container container_;
-    engines engines_;
-    fd_set reading_, writing_;
-
-  public:
-    broker() : handler_(queues_) {
-        FD_ZERO(&reading_);
-        FD_ZERO(&writing_);
-    }
-
-    ~broker() {
-        for (engines::iterator i = engines_.begin(); i != engines_.end(); ++i)
-            delete *i;
-    }
-
-    void run(const proton::url& url) {
-        proton::io::socket::listener listener(url.host(), url.port());
-        std::cout << "listening on " << url << " fd=" << listener.socket() << std::endl;
-        FD_SET(listener.socket(), &reading_);
-        while(true) {
-            fd_set readable_set = reading_;
-            fd_set writable_set = writing_;
-            check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
-
-            if (FD_ISSET(listener.socket(), &readable_set)) {
-                std::string client_host, client_port;
-                int fd = listener.accept(client_host, client_port);
-                std::cout << "accepted " << client_host << ":" << client_port
-                          << " fd=" << fd << std::endl;
-                engines_.insert(
-                    new proton::io::socket::engine(
-                        fd, handler_, container_.make_options()));
-                FD_SET(fd, &reading_);
-                FD_SET(fd, &writing_);
-            }
-
-            for (engines::iterator i = engines_.begin(); i != engines_.end(); ) {
-                proton::io::socket::engine *eng = *(i++);
-                int flags = 0;
-                if (FD_ISSET(eng->socket(), &writable_set))
-                    eng->write();
-                if (FD_ISSET(eng->socket(), &readable_set))
-                    eng->read();
-                if (eng->dispatch()) {
-                    fd_set_if(eng->read_buffer().size, eng->socket(), &reading_);
-                    fd_set_if(eng->write_buffer().size, eng->socket(), &writing_);
-                } else {
-                    std::cout << "closed fd=" << eng->socket() << std::endl;
-                    engines_.erase(eng);
-                    delete eng;
-                }
-            }
-        }
-    }
-};
-
-void fd_set_if(bool on, int fd, fd_set *fds) {
-    if (on)
-        FD_SET(fd, fds);
-    else
-        FD_CLR(fd, fds);
-}
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("0.0.0.0");
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "listen on URL", "URL");
-    try {
-        opts.parse();
-        broker().run(address);
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}
-#else // WIN32
-
-#include "proton/acceptor.hpp"
-#include "proton/container.hpp"
-#include "proton/value.hpp"
-
-#include "../fake_cpp11.hpp"
-
-class broker {
-  public:
-    broker(const proton::url& url) : handler_(url, queues_) {}
-
-    proton::handler& handler() { return handler_; }
-
-  private:
-
-    class my_handler : public broker_handler {
-      public:
-        my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
-
-        void on_container_start(proton::container &c) override {
-            c.listen(url_);
-            std::cout << "broker listening on " << url_ << std::endl;
-        }
-
-      private:
-        const proton::url& url_;
-    };
-
-  private:
-    queues queues_;
-    my_handler handler_;
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    proton::url url("0.0.0.0");
-    options opts(argc, argv);
-    opts.add_value(url, 'a', "address", "listen on URL", "URL");
-    try {
-        opts.parse();
-        broker b(url);
-        proton::container(b.handler()).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp
deleted file mode 100644
index 8e58a38..0000000
--- a/examples/cpp/engine/client.cpp
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/delivery.hpp"
-#include "proton/handler.hpp"
-#include "proton/connection.hpp"
-#include "proton/tracker.hpp"
-#include "proton/source_options.hpp"
-
-#include <iostream>
-#include <vector>
-
-#include "../fake_cpp11.hpp"
-
-using proton::receiver_options;
-using proton::source_options;
-
-class client : public proton::handler {
-  private:
-    proton::url url;
-    std::vector<std::string> requests;
-    proton::sender sender;
-    proton::receiver receiver;
-
-  public:
-    client(const proton::url &u, const std::vector<std::string>& r) : url(u), requests(r) {}
-
-    void on_connection_open(proton::connection &c) override {
-        sender = c.open_sender(url.path());
-        // Create a receiver requesting a dynamically created queue
-        // for the message source.
-        receiver_options dynamic_addr = receiver_options().source(source_options().dynamic(true));
-        receiver = c.open_receiver("", dynamic_addr);
-    }
-
-    void send_request() {
-        proton::message req;
-        req.body(requests.front());
-        req.reply_to(receiver.source().address());
-        sender.send(req);
-    }
-
-    void on_receiver_open(proton::receiver &) override {
-        send_request();
-    }
-
-    void on_message(proton::delivery &d, proton::message &response) override {
-        if (requests.empty()) return; // Spurious extra message!
-        std::cout << requests.front() << " => " << response.body() << std::endl;
-        requests.erase(requests.begin());
-        if (!requests.empty()) {
-            send_request();
-        } else {
-            d.connection().close();
-        }
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("127.0.0.1:5672/examples");
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
-
-    try {
-        opts.parse();
-
-        std::vector<std::string> requests;
-        requests.push_back("Twas brillig, and the slithy toves");
-        requests.push_back("Did gire and gymble in the wabe.");
-        requests.push_back("All mimsy were the borogroves,");
-        requests.push_back("And the mome raths outgrabe.");
-        client handler(address, requests);
-        proton::io::socket::engine(address, handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_recv.cpp b/examples/cpp/engine/direct_recv.cpp
deleted file mode 100644
index 48f4478..0000000
--- a/examples/cpp/engine/direct_recv.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/delivery.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/handler.hpp"
-#include "proton/receiver.hpp"
-#include "proton/url.hpp"
-#include "proton/value.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class direct_recv : public proton::handler {
-  private:
-    uint64_t expected;
-    uint64_t received;
-
-  public:
-    direct_recv(int c) : expected(c), received(0) {}
-
-    void on_message(proton::delivery &d, proton::message &msg) override {
-        if (msg.id().get<uint64_t>() < received)
-            return; // ignore duplicate
-        if (expected == 0 || received < expected) {
-            std::cout << msg.body() << std::endl;
-            received++;
-        }
-        if (received == expected) {
-            d.receiver().close();
-            d.connection().close();
-        }
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("127.0.0.1:5672/examples");
-    int message_count = 100;
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "listen and receive on URL", "URL");
-    opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
-    try {
-        opts.parse();
-        proton::url url(address);
-        proton::io::socket::listener listener(url.host(), url.port());
-        std::cout << "direct_recv listening on " << url << std::endl;
-        direct_recv handler(message_count);
-        proton::io::socket::engine(listener.accept(), handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_send.cpp b/examples/cpp/engine/direct_send.cpp
deleted file mode 100644
index 2d9acf0..0000000
--- a/examples/cpp/engine/direct_send.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/acceptor.hpp"
-#include "proton/connection.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/tracker.hpp"
-#include "proton/value.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class simple_send : public proton::handler {
-  private:
-    int sent;
-    int confirmed;
-    int total;
-  public:
-    simple_send(int c) : sent(0), confirmed(0), total(c) {}
-
-    void on_sendable(proton::sender &sender) override {
-        while (sender.credit() && sent < total) {
-            proton::message msg;
-            msg.id(sent + 1);
-            std::map<std::string, int> m;
-            m["sequence"] = sent+1;
-            msg.body(m);
-            sender.send(msg);
-            sent++;
-        }
-    }
-
-    void on_tracker_accept(proton::tracker &t) override {
-        confirmed++;
-        if (confirmed == total) {
-            std::cout << "all messages confirmed" << std::endl;
-            t.connection().close();
-        }
-    }
-
-    void on_transport_close(proton::transport &) override {
-        sent = confirmed;
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("127.0.0.1:5672/examples");
-    int message_count = 100;
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "listen and send on URL", "URL");
-    opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
-    try {
-        opts.parse();
-        proton::url url(address);
-        proton::io::socket::listener listener(url.host(), url.port());
-        std::cout << "direct_send listening on " << url << std::endl;
-        simple_send handler(message_count);
-        proton::io::socket::engine(listener.accept(), handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/helloworld.cpp b/examples/cpp/engine/helloworld.cpp
deleted file mode 100644
index a4f23ef..0000000
--- a/examples/cpp/engine/helloworld.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/delivery.hpp"
-#include "proton/handler.hpp"
-#include "proton/tracker.hpp"
-#include "proton/url.hpp"
-#include "proton/io/socket.hpp"
-
-#include <iostream>
-
-#include "../fake_cpp11.hpp"
-
-class hello_world : public proton::handler {
-  private:
-    std::string address_;
-
-  public:
-    hello_world(const std::string& address) : address_(address) {}
-
-    void on_connection_open(proton::connection &c) override {
-        c.open_receiver(address_);
-        c.open_sender(address_);
-    }
-
-    void on_sendable(proton::sender &s) override {
-        proton::message m("Hello World!");
-        s.send(m);
-        s.close();
-    }
-
-    void on_message(proton::delivery &d, proton::message &m) override {
-        std::cout << m.body() << std::endl;
-        d.connection().close();
-    }
-};
-
-int main(int argc, char **argv) {
-    try {
-        proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples");
-        hello_world hw(url.path());
-        proton::io::socket::engine(url, hw).run();
-
-        return 0;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/options.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/options.hpp b/examples/cpp/engine/options.hpp
deleted file mode 100644
index bd477b5..0000000
--- a/examples/cpp/engine/options.hpp
+++ /dev/null
@@ -1,173 +0,0 @@
-#ifndef OPTIONS_HPP
-#define OPTIONS_HPP
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <string>
-#include <sstream>
-#include <ostream>
-#include <vector>
-#include <stdexcept>
-
-/** bad_option is thrown for option parsing errors */
-struct bad_option : public std::runtime_error {
-    bad_option(const std::string& s) : std::runtime_error(s) {}
-};
-
-/** Simple command-line option parser for example programs */
-class options {
-  public:
-
-    options(int argc, char const * const * argv) : argc_(argc), argv_(argv), prog_(argv[0]), help_() {
-        size_t slash = prog_.find_last_of("/\\");
-        if (slash != std::string::npos)
-            prog_ = prog_.substr(slash+1); // Extract prog name from path
-        add_flag(help_, 'h', "help", "Print the help message");
-    }
-
-    ~options() {
-        for (opts::iterator i = opts_.begin(); i != opts_.end(); ++i)
-            delete *i;
-    }
-
-    /** Updates value when parse() is called if option is present with a value. */
-    template<class T>
-    void add_value(T& value, char short_name, const std::string& long_name, const std::string& description, const std::string var) {
-        opts_.push_back(new option_value<T>(value, short_name, long_name, description, var));
-    }
-
-    /** Sets flag when parse() is called if option is present. */
-    void add_flag(bool& flag, char short_name, const std::string& long_name, const std::string& description) {
-        opts_.push_back(new option_flag(flag, short_name, long_name, description));
-    }
-
-    /** Parse the command line, return the index of the first non-option argument.
-     *@throws bad_option if there is a parsing error or unknown option.
-     */
-    int parse() {
-        int arg = 1;
-        for (; arg < argc_ && argv_[arg][0] == '-'; ++arg) {
-            opts::iterator i = opts_.begin();
-            while (i != opts_.end() && !(*i)->parse(argc_, argv_, arg))
-                ++i;
-            if (i == opts_.end())
-                throw bad_option(std::string("unknown option ") + argv_[arg]);
-        }
-        if (help_) throw bad_option("");
-        return arg;
-    }
-
-    /** Print a usage message */
-  friend std::ostream& operator<<(std::ostream& os, const options& op) {
-      os << std::endl << "usage: " << op.prog_ << " [options]" << std::endl;
-      os << std::endl << "options:" << std::endl;
-      for (opts::const_iterator i = op.opts_.begin(); i < op.opts_.end(); ++i)
-          os << **i << std::endl;
-      return os;
-  }
-
- private:
-    class option {
-      public:
-        option(char s, const std::string& l, const std::string& d, const std::string v) :
-            short_(std::string("-") + s), long_("--" + l), desc_(d), var_(v) {}
-        virtual ~option() {}
-
-        virtual bool parse(int argc, char const * const * argv, int &i) = 0;
-        virtual void print_default(std::ostream&) const {};
-
-      friend std::ostream& operator<<(std::ostream& os, const option& op) {
-          os << "  " << op.short_;
-          if (!op.var_.empty()) os << " " << op.var_;
-          os << ", " << op.long_;
-          if (!op.var_.empty()) os << "=" << op.var_;
-          os << std::endl << "        " << op.desc_;
-          op.print_default(os);
-          return os;
-      }
-
-      protected:
-        std::string short_, long_, desc_, var_;
-    };
-
-    template <class T>
-    class option_value : public option {
-      public:
-        option_value(T& value, char s, const std::string& l, const std::string& d, const std::string& v) :
-            option(s, l, d, v), value_(value) {}
-
-        bool parse(int argc, char const * const * argv, int &i) {
-            std::string arg(argv[i]);
-            if (arg == short_ || arg == long_) {
-                if (i < argc-1) {
-                    set_value(arg, argv[++i]);
-                    return true;
-                } else {
-                    throw bad_option("missing value for " + arg);
-                }
-            }
-            if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] == '=' ) {
-                set_value(long_, arg.substr(long_.size()+1));
-                return true;
-            }
-            return false;
-        }
-
-        virtual void print_default(std::ostream& os) const { os << " (default " << value_ << ")"; }
-
-        void set_value(const std::string& opt, const std::string& s) {
-            std::istringstream is(s);
-            is >> value_;
-            if (is.fail() || is.bad())
-                throw bad_option("bad value for " + opt + ": " + s);
-        }
-
-      private:
-        T& value_;
-    };
-
-    class option_flag: public option {
-      public:
-        option_flag(bool& flag, const char s, const std::string& l, const std::string& d) :
-            option(s, l, d, ""), flag_(flag)
-        { flag_ = false; }
-
-        bool parse(int /*argc*/, char const * const * argv, int &i) {
-            if (argv[i] == short_ || argv[i] == long_) {
-                flag_ = true;
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-      private:
-        bool &flag_;
-    };
-
-    typedef std::vector<option*> opts;
-
-    int argc_;
-    char const * const * argv_;
-    std::string prog_;
-    opts opts_;
-    bool help_;
-};
-
-#endif // OPTIONS_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/server.cpp b/examples/cpp/engine/server.cpp
deleted file mode 100644
index 31f3599..0000000
--- a/examples/cpp/engine/server.cpp
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/connection.hpp"
-#include "proton/delivery.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/tracker.hpp"
-#include "proton/url.hpp"
-
-#include <iostream>
-#include <map>
-#include <string>
-#include <cctype>
-
-#include "../fake_cpp11.hpp"
-
-class server : public proton::handler {
-  private:
-    typedef std::map<std::string, proton::sender> sender_map;
-    proton::url url;
-    sender_map senders;
-
-  public:
-
-    server(const std::string &u) : url(u) {}
-
-    void on_connection_open(proton::connection &c) override {
-        c.open_receiver(url.path());
-        std::cout << "server connected to " << url << std::endl;
-    }
-
-    std::string to_upper(const std::string &s) {
-        std::string uc(s);
-        size_t l = uc.size();
-        for (size_t i=0; i<l; i++) uc[i] = std::toupper(uc[i]);
-        return uc;
-    }
-
-    void on_message(proton::delivery &d, proton::message &m) override {
-        std::cout << "Received " << m.body() << std::endl;
-        std::string reply_to = m.reply_to();
-        proton::message reply;
-        reply.to(reply_to);
-        reply.body(to_upper(proton::get<std::string>(m.body())));
-        reply.correlation_id(m.correlation_id());
-        if (!senders[reply_to])
-            senders[reply_to] = d.connection().open_sender(reply_to);
-        senders[reply_to].send(reply);
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("amqp://0.0.0.0:5672/examples");
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "listen on URL", "URL");
-    try {
-        opts.parse();
-        server handler(address);
-        proton::io::socket::engine(address, handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_recv.cpp b/examples/cpp/engine/simple_recv.cpp
deleted file mode 100644
index ffd80f9..0000000
--- a/examples/cpp/engine/simple_recv.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/receiver.hpp"
-#include "proton/value.hpp"
-#include "proton/message_id.hpp"
-#include "proton/delivery.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class simple_recv : public proton::handler {
-  private:
-    proton::url url;
-    proton::receiver receiver;
-    uint64_t expected;
-    uint64_t received;
-  public:
-
-    simple_recv(const std::string &s, int c) : url(s), expected(c), received(0) {}
-
-    void on_connection_open(proton::connection &c) override {
-        receiver = c.open_receiver(url.path());
-        std::cout << "simple_recv listening on " << url << std::endl;
-    }
-
-    void on_message(proton::delivery& d, proton::message &msg) override {
-        if (msg.id().get<uint64_t>() < received)
-            return; // ignore duplicate
-        if (expected == 0 || received < expected) {
-            std::cout << msg.body() << std::endl;
-            received++;
-            if (received == expected) {
-                d.receiver().close();
-                d.connection().close();
-            }
-        }
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("127.0.0.1:5672/examples");
-    int message_count = 100;
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL");
-    opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
-
-    try {
-        opts.parse();
-        simple_recv handler(address, message_count);
-        proton::io::socket::engine(address, handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_send.cpp b/examples/cpp/engine/simple_send.cpp
deleted file mode 100644
index e08f39f..0000000
--- a/examples/cpp/engine/simple_send.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/connection.hpp"
-#include "proton/tracker.hpp"
-#include "proton/value.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class simple_send : public proton::handler {
-  private:
-    proton::url url;
-    int sent;
-    int confirmed;
-    int total;
-  public:
-
-    simple_send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {}
-
-    void on_connection_open(proton::connection &c) override {
-        c.open_sender(url.path());
-    }
-
-    void on_sendable(proton::sender &sender) override {
-        while (sender.credit() && sent < total) {
-            proton::message msg;
-            msg.id(sent + 1);
-            std::map<std::string, int> m;
-            m["sequence"] = sent+1;
-            msg.body(m);
-            sender.send(msg);
-            sent++;
-        }
-    }
-
-    void on_tracker_accept(proton::tracker &t) override {
-        confirmed++;
-        if (confirmed == total) {
-            std::cout << "all messages confirmed" << std::endl;
-            t.connection().close();
-        }
-    }
-
-    void on_transport_close(proton::transport &) override {
-        sent = confirmed;
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("127.0.0.1:5672/examples");
-    int message_count = 100;
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
-    opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
-    try {
-        opts.parse();
-        simple_send handler(address, message_count);
-        proton::io::socket::engine(address, handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/example/socket_windows.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/example/socket_windows.cpp b/examples/cpp/example/socket_windows.cpp
new file mode 100644
index 0000000..f312525
--- /dev/null
+++ b/examples/cpp/example/socket_windows.cpp
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "msg.hpp"
+
+#include <proton/io/socket.hpp>
+#include <proton/url.hpp>
+
+#define FD_SETSIZE 2048
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <assert.h>
+
+namespace proton {
+namespace io {
+namespace socket {
+
+const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET;
+
+std::string error_str() {
+    HRESULT code = WSAGetLastError();
+    char err[1024] = {0};
+    FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+                  FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+    return err;
+}
+
+io_error::io_error(const std::string& s) : error(s) {}
+
+namespace {
+
+template <class T> T check(T result, const std::string& msg=std::string()) {
+    if (result == SOCKET_ERROR)
+        throw io_error(msg + error_str());
+    return result;
+}
+
+void gai_check(int result, const std::string& msg="") {
+    if (result)
+        throw io_error(msg + gai_strerror(result));
+}
+
+} // namespace
+
+void initialize() {
+    WSADATA unused;
+    check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2
+}
+
+void finalize() {
+    WSACleanup();
+}
+
+void engine::init() {
+    u_long nonblock = 1;
+    check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: ");
+}
+
+engine::engine(descriptor fd, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(fd)
+{
+    init();
+}
+
+engine::engine(const url& u, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(connect(u))
+{
+    init();
+    connection().open();
+}
+
+engine::~engine() {}
+
+void engine::read() {
+    mutable_buffer rbuf = read_buffer();
+    if (rbuf.size > 0) {
+        int n = ::recv(socket_, rbuf.data, rbuf.size, 0);
+        if (n > 0)
+            read_done(n);
+        else if (n == 0)
+            read_close();
+        else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
+            close(error_condition("io_error", error_str()));
+    }
+}
+
+void engine::write() {
+    const_buffer wbuf = write_buffer();
+    if (wbuf.size > 0) {
+    int n = ::send(socket_, wbuf.data, wbuf.size, 0);
+    if (n > 0)
+        write_done(n);
+    else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
+        close(error_condition("io_error", error_str()));
+    }
+}
+
+void engine::run() {
+    while (dispatch()) {
+        fd_set rd, wr;
+        FD_ZERO(&rd);
+        if (read_buffer().size)
+            FD_SET(socket_, &rd);
+        FD_ZERO(&wr);
+        if (write_buffer().size)
+            FD_SET(socket_, &wr);
+        int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
+        if (n < 0) {
+            close(error_condition("select: ", error_str()));
+            break;
+        }
+        if (FD_ISSET(socket_, &rd)) {
+            read();
+        }
+        if (FD_ISSET(socket_, &wr))
+            write();
+    }
+    ::closesocket(socket_);
+}
+
+namespace {
+struct auto_addrinfo {
+    struct addrinfo *ptr;
+    auto_addrinfo() : ptr(0) {}
+    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
+    addrinfo* operator->() const { return ptr; }
+};
+
+static const char *amqp_service(const char *port) {
+  // Help older Windows to know about amqp[s] ports
+  if (port) {
+    if (!strcmp("amqp", port)) return "5672";
+    if (!strcmp("amqps", port)) return "5671";
+  }
+  return port;
+}
+}
+
+
+descriptor connect(const proton::url& u) {
+    // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
+    std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host();
+    descriptor fd = INVALID_SOCKET;
+    try{
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                amqp_service(u.port().empty() ? 0 : u.port().c_str()),
+                                0, &addr.ptr),
+                  "connect address invalid: ");
+        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: ");
+        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
+        return fd;
+    } catch (...) {
+        if (fd != INVALID_SOCKET) ::closesocket(fd);
+        throw;
+    }
+}
+
+listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET) {
+    try {
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
+                  "listener address invalid: ");
+        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: ");
+        bool yes = true;
+        check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes, sizeof(yes)), "setsockopt: ");
+        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: ");
+        check(::listen(socket_, 32), "listener listen: ");
+    } catch (...) {
+        if (socket_ != INVALID_SOCKET) ::closesocket(socket_);
+        throw;
+    }
+}
+
+listener::~listener() { ::closesocket(socket_); }
+
+descriptor listener::accept(std::string& host_str, std::string& port_str) {
+    struct sockaddr_storage addr;
+    socklen_t size = sizeof(addr);
+    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
+    char host[NI_MAXHOST], port[NI_MAXSERV];
+    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
+                          host, sizeof(host), port, sizeof(port), 0),
+              "accept invalid remote address: ");
+    host_str = host;
+    port_str = port;
+    return fd;
+}
+
+}}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index d228d67..38a5154 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -131,60 +131,40 @@ class Proc(Popen):
             raise ProcError(self, "timeout waiting for exit")
 
 
-def count_tests(cls):
-    methods = inspect.getmembers(cls, predicate=inspect.ismethod)
-    tests = [ i for i,j in methods if i.startswith('test_') ]
-    return len(tests)
-
-class CompatSetupClass(object):
-    # Roughly provides setUpClass and tearDownClass functionality for older python versions
-    # in our test scenarios
-    def __init__(self, target):
-        self.completed = False
-        self.test_count = count_tests(target)
-        self.target = target
-        self.global_setup = False
-
-    def note_setup(self):
-        if not self.global_setup:
-            self.global_setup = True
-            self.target.setup_class()
-
-    def note_teardown(self):
-        self.test_count -=  1
-        if self.test_count == 0:
-            self.completed = True
-            self.target.teardown_class()
-        
-
-class ExampleTestCase(unittest.TestCase):
-
-    @classmethod
-    def setup_class(cls):
-        pass
-
-    @classmethod
-    def teardown_class(cls):
-        pass
-
-    def completed(self):
-        cls = self.__class__
-        return cls.compat_ and cls.compat_.completed
-
+if hasattr(unittest.TestCase, 'setUpClass') and  hasattr(unittest.TestCase, 'tearDownClass'):
+    TestCase = unittest.TestCase
+else:
+    class TestCase(unittest.TestCase):
+        """
+        Roughly provides setUpClass and tearDownClass functionality for older python
+        versions in our test scenarios. If subclasses override setUp or tearDown
+        they *must* call the superclass.
+        """
+        def setUp(self):
+            if not hasattr(type(self), '_setup_class_count'):
+                type(self)._setup_class_count = len(
+                    inspect.getmembers(
+                        type(self),
+                        predicate=lambda(m): inspect.ismethod(m) and m.__name__.startswith('test_')))
+                type(self).setUpClass()
+
+        def tearDown(self):
+            self.assertTrue(self._setup_class_count > 0)
+            self._setup_class_count -=  1
+            if self._setup_class_count == 0:
+                type(self).tearDownClass()
+
+
+class ExampleTestCase(TestCase):
+    """TestCase that manages started processes"""
     def setUp(self):
-        cls = self.__class__
-        if not hasattr(cls, "compat_"):
-            cls.compat_ = CompatSetupClass(cls)
-        if cls.compat_.completed:
-            # Last test for this class already seen.
-            raise Exception("Test sequencing error")
-        cls.compat_.note_setup()
+        super(ExampleTestCase, self).setUp()
         self.procs = []
 
     def tearDown(self):
         for p in self.procs:
             p.safe_kill()
-        self.__class__.compat_.note_teardown()
+        super(ExampleTestCase, self).tearDown()
 
     def proc(self, *args, **kwargs):
         p = Proc(*args, **kwargs)
@@ -194,27 +174,26 @@ class ExampleTestCase(unittest.TestCase):
 class BrokerTestCase(ExampleTestCase):
     """
     ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
+    Subclasses must set `broker_exe` class variable with the name of the broker executable.
     """
 
-    # setUpClass not available until 2.7
     @classmethod
-    def setup_class(cls):
+    def setUpClass(cls):
         cls.addr = pick_addr() + "/examples"
-        cls.broker = Proc(["broker", "-a", cls.addr], ready="listening")
+        cls.broker = None       # In case Proc throws, create the attribute.
+        cls.broker = Proc([cls.broker_exe, "-a", cls.addr], ready="listening")
         cls.broker.wait_ready()
 
-    # tearDownClass not available until 2.7
     @classmethod
-    def teardown_class(cls):
-        cls.broker.safe_kill()
+    def tearDownClass(cls):
+        if cls.broker: cls.broker.safe_kill()
 
     def tearDown(self):
+        b = type(self).broker
+        if b and b.poll() !=  None: # Broker crashed
+            type(self).setUpClass() # Start another for the next test.
+            raise ProcError(b, "broker crash")
         super(BrokerTestCase, self).tearDown()
-        if not self.completed():
-            b = type(self).broker
-            if b.poll() !=  None: # Broker crashed
-                type(self).setUpClass() # Start another for the next test.
-                raise ProcError(b, "broker crash")
 
 
 CLIENT_EXPECT="""Twas brillig, and the slithy toves => TWAS BRILLIG, AND THE SLITHY TOVES
@@ -230,6 +209,8 @@ def recv_expect(name, addr):
 class ContainerExampleTest(BrokerTestCase):
     """Run the container examples, verify they behave as expected."""
 
+    broker_exe = "broker"
+
     def test_helloworld(self):
         self.assertEqual('Hello World!\n', self.proc(["helloworld", self.addr]).wait_exit())
 
@@ -341,8 +322,8 @@ Hello World!
         self.assertEqual(expect_found, True)
 
 
-class ConnectionEngineExampleTest(BrokerTestCase):
-    """Run the connction_engine examples, verify they behave as expected."""
+class EngineTestCase(BrokerTestCase):
+    """Run selected clients to test a connction_engine broker."""
 
     def test_helloworld(self):
         self.assertEqual('Hello World!\n',
@@ -380,5 +361,8 @@ class ConnectionEngineExampleTest(BrokerTestCase):
         self.assertEqual(CLIENT_EXPECT,
                          self.proc(["client", "-a", self.addr]).wait_exit())
 
+class MtBrokerTest(EngineTestCase):
+    broker_exe = "mt_broker"
+
 if __name__ == "__main__":
     unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/mt/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/broker.cpp b/examples/cpp/mt/broker.cpp
new file mode 100644
index 0000000..48738c9
--- /dev/null
+++ b/examples/cpp/mt/broker.cpp
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "../options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/controller.hpp>
+#include <proton/delivery.hpp>
+#include <proton/handler.hpp>
+#include <proton/url.hpp>
+#include <proton/work_queue.hpp>
+
+#include <atomic>
+#include <functional>
+#include <iostream>
+#include <mutex>
+#include <thread>
+
+// Thread safe queue.
+// Stores messages, notifies subscribed connections when there is data.
+class queue {
+  public:
+    queue(const std::string& name) : name_(name) {}
+
+    std::string name() const { return name_; }
+
+    // Push a message onto the queue.
+    // If the queue was previously empty, notify subscribers it has messages.
+    // Called from receiver's connection.
+    void push(const proton::message &m) {
+        std::lock_guard<std::mutex> g(lock_);
+        messages_.push_back(m);
+        if (messages_.size() == 1) { // Non-empty, notify subscribers
+            for (auto cb : callbacks_)
+                cb(this);
+            callbacks_.clear();
+        }
+    }
+
+    // If the queue is not empty, pop a message into m and return true.
+    // Otherwise save callback to be called when there are messages and return false.
+    // Called from sender's connection.
+    bool pop(proton::message& m, std::function<void(queue*)> callback) {
+        std::lock_guard<std::mutex> g(lock_);
+        if (messages_.empty()) {
+            callbacks_.push_back(callback);
+            return false;
+        } else {
+            m = std::move(messages_.front());
+            messages_.pop_front();
+            return true;
+        }
+    }
+
+  private:
+    const std::string name_;
+    std::mutex lock_;
+    std::deque<proton::message> messages_;
+    std::vector<std::function<void(queue*)> > callbacks_;
+};
+
+/// Thread safe map of queues.
+class queues {
+  public:
+    queues() : next_id_(0) {}
+
+    // Get or create the named queue.
+    queue* get(const std::string& name) {
+        std::lock_guard<std::mutex> g(lock_);
+        auto i = queues_.insert(queue_map::value_type(name, nullptr)).first;
+        if (!i->second)
+            i->second.reset(new queue(name));
+        return i->second.get();
+    }
+
+    // Create a dynamic queue with a unique name.
+    queue* dynamic() {
+        std::ostringstream os;
+        os << "_dynamic_" << next_id_++;
+        return get(os.str());
+    }
+
+  private:
+    typedef std::map<std::string, std::unique_ptr<queue> > queue_map;
+
+    std::mutex lock_;
+    queue_map queues_;
+    std::atomic<uint64_t> next_id_; // Use to generate unique queue IDs.
+};
+
+/// Broker connection handler. Things to note:
+///
+/// Each handler manages a single connection. Proton AMQP callbacks and queue
+/// callbacks via proton::work_queue are serialized per-connection, so the
+/// handler does not need a lock. Handlers for different connections can be
+/// called concurrently.
+///
+/// Senders (aka subscriptions) need some cross-thread notification:.
+///
+/// - a sender that gets credit calls queue::pop() in `on_sendable()`
+///   - on success it sends the message immediatly.
+///   - on queue empty, the sender is added to the `blocked_` set and the queue stores a callback.
+/// - when a receiver thread pushes a message, the queue calls its callbacks.
+/// - the callback causes a serialized call to has_messages() which re-tries all `blocked_` senders.
+///
+class broker_connection_handler : public proton::handler {
+  public:
+    broker_connection_handler(queues& qs) : queues_(qs) {}
+
+    void on_connection_open(proton::connection& c) override {
+        // Create the has_messages callback for use with queue subscriptions.
+        //
+        // Note the captured and bound arguments must be thread-safe to copy,
+        // shared_ptr<work_queue>, and plain pointers this and q are all safe.
+        //
+        // The proton::connection object c is not thread-safe to copy.
+        // However when the work_queue calls this->has_messages it will be safe
+        // to use any proton objects associated with c again.
+        auto work = proton::work_queue::get(c);
+        has_messages_callback_ = [this, work](queue* q) {
+            work->push(std::bind(&broker_connection_handler::has_messages, this, q));
+        };
+        c.open();               // Always accept
+    }
+
+    // A sender sends messages from a queue to a subscriber.
+    void on_sender_open(proton::sender &sender) override {
+        queue *q = sender.source().dynamic() ?
+            queues_.dynamic() : queues_.get(sender.source().address());
+        std::cout << "sending from " << q->name() << std::endl;
+    }
+
+    // We have credit to send a message.
+    void on_sendable(proton::sender &s) override {
+        queue* q = sender_queue(s);
+        if (!do_send(q, s))     // Queue is empty, save ourselves in the blocked set.
+            blocked_.insert(std::make_pair(q, s));
+    }
+
+    // A receiver receives messages from a publisher to a queue.
+    void on_receiver_open(proton::receiver &receiver) override {
+        std::string qname = receiver.target().address();
+        if (qname == "shutdown") {
+            std::cout << "broker shutting down" << std::endl;
+            // Sending to the special "shutdown" queue stops the broker.
+            proton::controller::get(receiver.connection()).stop(
+                proton::error_condition("shutdown", "stop broker"));
+        } else {
+            std::cout << "receiving to " << qname << std::endl;
+        }
+    }
+
+    // A message is received.
+    void on_message(proton::delivery &d, proton::message &m) override {
+        std::string qname = d.receiver().target().address();
+        queues_.get(qname)->push(m);
+    }
+
+    void on_session_close(proton::session &session) override {
+        // Erase all blocked senders that belong to session.
+        auto predicate = [session](const proton::sender& s) {
+            return s.session() == session;
+        };
+        erase_sender_if(blocked_.begin(), blocked_.end(), predicate);
+    }
+
+    void on_sender_close(proton::sender &sender) override {
+        // Erase sender from the blocked set.
+        auto range = blocked_.equal_range(sender_queue(sender));
+        auto predicate = [sender](const proton::sender& s) { return s == sender; };
+        erase_sender_if(range.first, range.second, predicate);
+    }
+
+    // The controller calls on_transport_close() last.
+    void on_transport_close(proton::transport&) override {
+        delete this;            // All done.
+    }
+
+  private:
+    typedef std::multimap<queue*, proton::sender> blocked_map;
+
+    // Get the queue associated with a sender.
+    queue* sender_queue(const proton::sender& s) {
+        return queues_.get(s.source().address()); // Thread safe.
+    }
+
+    // Only called if we have credit. Return true if we sent a message.
+    bool do_send(queue* q, proton::sender &s) {
+        proton::message m;
+        bool popped =  q->pop(m, has_messages_callback_);
+        if (popped)
+            s.send(m);
+        /// if !popped the queue has saved the callback for later.
+        return popped;
+    }
+
+    // Called via @ref work_queue when q has messages. Try all the blocked senders.
+    void has_messages(queue* q) {
+        auto range = blocked_.equal_range(q);
+        for (auto i = range.first; i != range.second;) {
+            if (i->second.credit() <= 0 || do_send(q, i->second))
+                i = blocked_.erase(i); // No credit or send was successful, stop blocked.
+            else
+                ++i;            // have credit, didn't send, keep blocked
+        }
+    }
+
+    // Use to erase closed senders from blocked_ set.
+    template <class Predicate>
+    void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) {
+        for (auto i = begin; i != end; ) {
+            if (p(i->second))
+                i = blocked_.erase(i);
+            else
+                ++i;
+        }
+    }
+
+    queues& queues_;
+    blocked_map blocked_;
+    std::function<void(queue*)> has_messages_callback_;
+    proton::connection connection_;
+};
+
+
+class broker {
+  public:
+    broker(const std::string addr) : controller_(proton::controller::create()) {
+        controller_->options(proton::connection_options().container_id("mt_broker"));
+        std::cout << "broker listening on " << addr << std::endl;
+        controller_->listen(addr, std::bind(&broker::new_handler, this));
+    }
+
+    void run() {
+        for(int i = 0; i < std::thread::hardware_concurrency(); ++i)
+            std::thread(&proton::controller::run, controller_.get()).detach();
+        controller_->wait();
+    }
+
+  private:
+    proton::handler* new_handler() {
+        return new broker_connection_handler(queues_);
+    }
+
+    queues queues_;
+    std::unique_ptr<proton::controller> controller_;
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    std::string address("0.0.0.0");
+    example::options opts(argc, argv);
+    opts.add_value(address, 'a', "address", "listen on URL", "URL");
+    try {
+        opts.parse();
+        broker(address).run();
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << "broker shutdown: " << e.what() << std::endl;
+    }
+    return 1;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-proton git commit: PROTON-1046: C++ multi-threaded controller and improved broker example

Posted by ac...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/mt/epoll_controller.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/epoll_controller.cpp b/examples/cpp/mt/epoll_controller.cpp
new file mode 100644
index 0000000..6c75b04
--- /dev/null
+++ b/examples/cpp/mt/epoll_controller.cpp
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/controller.hpp>
+#include <proton/url.hpp>
+#include <proton/work_queue.hpp>
+
+#include <proton/io/connection_engine.hpp>
+#include <proton/io/default_controller.hpp>
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <condition_variable>
+#include <thread>
+#include <set>
+#include <sstream>
+#include <system_error>
+
+// Linux native IO
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <unistd.h>
+
+// This is the only public function, the implementation is private.
+std::unique_ptr<proton::controller> make_epoll_controller();
+
+// Private implementation
+namespace  {
+
+
+using lock_guard = std::lock_guard<std::mutex>;
+
+// Get string from errno
+std::string errno_str(const std::string& msg) {
+    return std::system_error(errno, std::system_category(), msg).what();
+}
+
+// Throw proton::error(errno_str(msg)) if result < 0
+int check(int result, const std::string& msg) {
+    if (result < 0)
+        throw proton::error(errno_str(msg));
+    return result;
+}
+
+// Wrapper for getaddrinfo() that cleans up in destructor.
+class unique_addrinfo {
+  public:
+    unique_addrinfo(const std::string& addr) : addrinfo_(0) {
+        proton::url u(addr);
+        int result = ::getaddrinfo(char_p(u.host()), char_p(u.port()), 0, &addrinfo_);
+        if (result)
+            throw proton::error(std::string("bad address: ") + gai_strerror(result));
+    }
+    ~unique_addrinfo() { if (addrinfo_) ::freeaddrinfo(addrinfo_); }
+
+    ::addrinfo* operator->() const { return addrinfo_; }
+
+  private:
+    static const char* char_p(const std::string& s) { return s.empty() ? 0 : s.c_str(); }
+    ::addrinfo *addrinfo_;
+};
+
+// File descriptor wrapper that calls ::close in destructor.
+class unique_fd {
+  public:
+    unique_fd(int fd) : fd_(fd) {}
+    ~unique_fd() { if (fd_ >= 0) ::close(fd_); }
+    operator int() const { return fd_; }
+    int release() { int ret = fd_; fd_ = -1; return ret; }
+
+  protected:
+    int fd_;
+};
+
+class pollable;
+class pollable_engine;
+class pollable_listener;
+
+class epoll_controller : public proton::controller {
+  public:
+    epoll_controller();
+    ~epoll_controller();
+
+    // Implemenet the proton::controller interface
+    void connect(const std::string& addr,
+                 proton::handler& h,
+                 const proton::connection_options& opts) override;
+
+    void listen(const std::string& addr,
+                std::function<proton::handler*(const std::string&)> factory,
+                const proton::connection_options& opts) override;
+
+    void stop_listening(const std::string& addr) override;
+
+    void options(const proton::connection_options& opts) override;
+    proton::connection_options options() override;
+
+    void run() override;
+
+    void stop_on_idle() override;
+    void stop(const proton::error_condition& err) override;
+    void wait() override;
+
+    // Functions used internally.
+
+    void add_engine(proton::handler* h, proton::connection_options opts, int fd);
+    void erase(pollable*);
+
+  private:
+    void idle_check(const lock_guard&);
+    void interrupt();
+
+    const unique_fd epoll_fd_;
+    const unique_fd interrupt_fd_;
+
+    mutable std::mutex lock_;
+
+    proton::connection_options options_;
+    std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;
+    std::map<pollable*, std::unique_ptr<pollable_engine> > engines_;
+
+    std::condition_variable stopped_;
+    std::atomic<size_t> threads_;
+    bool stopping_;
+    proton::error_condition stop_err_;
+};
+
+// Base class for pollable file-descriptors. Manages epoll interaction,
+// subclasses implement virtual work() to do their serialized work.
+class pollable {
+  public:
+    pollable(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd), notified_(false), working_(false)
+    {
+        int flags = check(::fcntl(fd, F_GETFL, 0), "non-blocking");
+        check(::fcntl(fd, F_SETFL,  flags | O_NONBLOCK), "non-blocking");
+        ::epoll_event ev = {0};
+        ev.data.ptr = this;
+        ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd_, &ev);
+    }
+
+    virtual ~pollable() {
+        ::epoll_event ev = {0};
+        ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd_, &ev); // Ignore errors.
+    }
+
+    bool do_work(uint32_t events) {
+        {
+            lock_guard g(lock_);
+            if (working_)
+                return true;         // Another thread is already working.
+            working_ = true;
+            notified_ = false;
+        }
+        uint32_t new_events = work(events);  // Serialized, outside the lock.
+        if (new_events) {
+            lock_guard g(lock_);
+            rearm(notified_ ?  EPOLLIN|EPOLLOUT : new_events);
+        }
+        return new_events;
+    }
+
+    // Called by work_queue to notify that there are jobs.
+    void notify() {
+        lock_guard g(lock_);
+        if (!notified_) {
+            notified_ = true;
+            if (!working_) // No worker thread, rearm now.
+                rearm(EPOLLIN|EPOLLOUT);
+        }
+    }
+
+  protected:
+
+    // Subclass implements  work.
+    // Returns epoll events to re-enable or 0 if finished.
+    virtual uint32_t work(uint32_t events) = 0;
+
+    const unique_fd fd_;
+    const int epoll_fd_;
+
+  private:
+
+    void rearm(uint32_t events) {
+        epoll_event ev;
+        ev.data.ptr = this;
+        ev.events = EPOLLONESHOT | events;
+        check(::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev), "re-arm epoll");
+        working_ = false;
+    }
+
+    std::mutex lock_;
+    bool notified_;
+    bool working_;
+};
+
+class work_queue : public proton::work_queue {
+  public:
+    typedef std::vector<std::function<void()> > jobs;
+
+    work_queue(pollable& p, proton::controller& c) :
+        pollable_(p), closed_(false), controller_(c) {}
+
+    bool push(std::function<void()> f) override {
+        // Note this is an unbounded work queue.
+        // A resource-safe implementation should be bounded.
+        lock_guard g(lock_);
+        if (closed_)
+            return false;
+        jobs_.push_back(f);
+        pollable_.notify();
+        return true;
+    }
+
+    jobs pop_all() {
+        lock_guard g(lock_);
+        return std::move(jobs_);
+    }
+
+    void close() {
+        lock_guard g(lock_);
+        closed_ = true;
+    }
+
+    proton::controller& controller() const override { return controller_; }
+
+  private:
+    std::mutex lock_;
+    pollable& pollable_;
+    jobs jobs_;
+    bool closed_;
+    proton::controller& controller_;
+};
+
+// Handle epoll wakeups for a connection_engine.
+class pollable_engine : public pollable {
+  public:
+
+    pollable_engine(
+        proton::handler* h, proton::connection_options opts, epoll_controller& c,
+        int fd, int epoll_fd
+    ) : pollable(fd, epoll_fd),
+        engine_(*h, opts.link_prefix(std::to_string(fd)+":")),
+        queue_(new work_queue(*this, c))
+    {
+        engine_.work_queue(queue_.get());
+    }
+
+    ~pollable_engine() {
+        queue_->close();               // No calls to notify() after this.
+        engine_.dispatch();            // Run any final events.
+        try { write(); } catch(...) {} // Write connection close if we can.
+        for (auto f : queue_->pop_all()) {// Run final queued work for side-effects.
+            try { f(); } catch(...) {}
+        }
+    }
+
+    uint32_t work(uint32_t events) {
+        try {
+            bool can_read = events & EPOLLIN, can_write = events && EPOLLOUT;
+            do {
+                can_write = can_write && write();
+                can_read = can_read && read();
+                for (auto f : queue_->pop_all()) // Run queued work
+                    f();
+                engine_.dispatch();
+            } while (can_read || can_write);
+            return (engine_.read_buffer().size ? EPOLLIN:0) |
+                (engine_.write_buffer().size ? EPOLLOUT:0);
+        } catch (const std::exception& e) {
+            close(proton::error_condition("exception", e.what()));
+        }
+        return 0;               // Ending
+    }
+
+    void close(const proton::error_condition& err) {
+        engine_.connection().close(err);
+    }
+
+  private:
+
+    bool write() {
+        if (engine_.write_buffer().size) {
+            ssize_t n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);
+            while (n == EINTR)
+                n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);
+            if (n > 0) {
+                engine_.write_done(n);
+                return true;
+            } else if (errno != EAGAIN && errno != EWOULDBLOCK)
+                check(n, "write");
+        }
+        return false;
+    }
+
+    bool read() {
+        if (engine_.read_buffer().size) {
+            ssize_t n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);
+            while (n == EINTR)
+                n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);
+            if (n > 0) {
+                engine_.read_done(n);
+                return true;
+            }
+            else if (n == 0)
+                engine_.read_close();
+            else if (errno != EAGAIN && errno != EWOULDBLOCK)
+                check(n, "read");
+        }
+        return false;
+    }
+
+    proton::io::connection_engine engine_;
+    std::shared_ptr<work_queue> queue_;
+};
+
+// A pollable listener fd that creates pollable_engine for incoming connections.
+class pollable_listener : public pollable {
+  public:
+    pollable_listener(
+        const std::string& addr,
+        std::function<proton::handler*(const std::string&)> factory,
+        int epoll_fd,
+        epoll_controller& c,
+        const proton::connection_options& opts
+    ) :
+        pollable(listen(addr), epoll_fd),
+        addr_(addr),
+        factory_(factory),
+        controller_(c),
+        opts_(opts)
+    {}
+
+    uint32_t work(uint32_t events) {
+        if (events & EPOLLRDHUP)
+            return 0;
+        int accepted = check(::accept(fd_, NULL, 0), "accept");
+        controller_.add_engine(factory_(addr_), opts_, accepted);
+        return EPOLLIN;
+    }
+
+    std::string addr() { return addr_; }
+
+  private:
+
+    int listen(const std::string& addr) {
+        std::string msg = "listen on "+addr;
+        unique_addrinfo ainfo(addr);
+        unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
+        int yes = 1;
+        check(::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), msg);
+        check(::bind(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
+        check(::listen(fd, 32), msg);
+        return fd.release();
+    }
+
+    std::string addr_;
+    std::function<proton::handler*(const std::string&)> factory_;
+    epoll_controller& controller_;
+    proton::connection_options opts_;
+};
+
+
+epoll_controller::epoll_controller()
+    : epoll_fd_(check(epoll_create(1), "epoll_create")),
+      interrupt_fd_(check(eventfd(1, 0), "eventfd")),
+      stopping_(false), threads_(0)
+{}
+
+epoll_controller::~epoll_controller() {
+    try {
+        stop(proton::error_condition("exception", "controller shut-down"));
+        wait();
+    } catch (...) {}
+}
+
+void epoll_controller::add_engine(proton::handler* h, proton::connection_options opts, int fd) {
+    lock_guard g(lock_);
+    if (stopping_)
+        throw proton::error("controller is stopping");
+    std::unique_ptr<pollable_engine> e(new pollable_engine(h, opts, *this, fd, epoll_fd_));
+    e->notify();
+    engines_[e.get()] = std::move(e);
+}
+
+void epoll_controller::erase(pollable* e) {
+    lock_guard g(lock_);
+    if (!engines_.erase(e)) {
+        pollable_listener* l = dynamic_cast<pollable_listener*>(e);
+        if (l)
+            listeners_.erase(l->addr());
+    }
+    idle_check(g);
+}
+
+void epoll_controller::idle_check(const lock_guard&) {
+    if (stopping_  && engines_.empty() && listeners_.empty())
+        interrupt();
+}
+
+void epoll_controller::connect(const std::string& addr,
+                               proton::handler& h,
+                               const proton::connection_options& opts)
+{
+    std::string msg = "connect to "+addr;
+    unique_addrinfo ainfo(addr);
+    unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
+    check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
+    add_engine(&h, options().update(opts), fd.release());
+}
+
+void epoll_controller::listen(const std::string& addr,
+                              std::function<proton::handler*(const std::string&)> factory,
+                              const proton::connection_options& opts)
+{
+    lock_guard g(lock_);
+    if (!factory)
+        throw proton::error("null function to listen on "+addr);
+    if (stopping_)
+        throw proton::error("controller is stopping");
+    auto& l = listeners_[addr];
+    l.reset(new pollable_listener(addr, factory, epoll_fd_, *this, options_.update(opts)));
+    l->notify();
+}
+
+void epoll_controller::stop_listening(const std::string& addr) {
+    lock_guard g(lock_);
+    listeners_.erase(addr);
+    idle_check(g);
+}
+
+void epoll_controller::options(const proton::connection_options& opts) {
+    lock_guard g(lock_);
+    options_ = opts;
+}
+
+proton::connection_options epoll_controller::options() {
+    lock_guard g(lock_);
+    return options_;
+}
+
+void epoll_controller::run() {
+    ++threads_;
+    try {
+        epoll_event e;
+        while(true) {
+            check(::epoll_wait(epoll_fd_, &e, 1, -1), "epoll_wait");
+            pollable* p = reinterpret_cast<pollable*>(e.data.ptr);
+            if (!p)
+                break;          // Interrupted
+            if (!p->do_work(e.events))
+                erase(p);
+        }
+    } catch (const std::exception& e) {
+        stop(proton::error_condition("exception", e.what()));
+    }
+    if (--threads_ == 0)
+        stopped_.notify_all();
+}
+
+void epoll_controller::stop_on_idle() {
+    lock_guard g(lock_);
+    stopping_ = true;
+    idle_check(g);
+}
+
+void epoll_controller::stop(const proton::error_condition& err) {
+    lock_guard g(lock_);
+    stop_err_ = err;
+    interrupt();
+}
+
+void epoll_controller::wait() {
+    std::unique_lock<std::mutex> l(lock_);
+    stopped_.wait(l, [this]() { return this->threads_ == 0; } );
+    for (auto& eng : engines_)
+        eng.second->close(stop_err_);
+    listeners_.clear();
+    engines_.clear();
+}
+
+void epoll_controller::interrupt() {
+    // Add an always-readable fd with 0 data and no ONESHOT to interrupt all threads.
+    epoll_event ev = {0};
+    ev.events = EPOLLIN;
+    check(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev), "interrupt");
+}
+
+// Register make_epoll_controller() as proton::controller::create().
+proton::io::default_controller instance(make_epoll_controller);
+
+}
+
+// This is the only public function.
+std::unique_ptr<proton::controller> make_epoll_controller() {
+    return std::unique_ptr<proton::controller>(new epoll_controller());
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/options.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/options.hpp b/examples/cpp/options.hpp
index bd477b5..54f6503 100644
--- a/examples/cpp/options.hpp
+++ b/examples/cpp/options.hpp
@@ -25,6 +25,7 @@
 #include <vector>
 #include <stdexcept>
 
+namespace example {
 /** bad_option is thrown for option parsing errors */
 struct bad_option : public std::runtime_error {
     bad_option(const std::string& s) : std::runtime_error(s) {}
@@ -169,5 +170,6 @@ class options {
     opts opts_;
     bool help_;
 };
+}
 
 #endif // OPTIONS_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/recurring_timer.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/recurring_timer.cpp b/examples/cpp/recurring_timer.cpp
index a4841b2..ef1d827 100644
--- a/examples/cpp/recurring_timer.cpp
+++ b/examples/cpp/recurring_timer.cpp
@@ -84,7 +84,7 @@ int main(int argc, char **argv) {
     // Command line options
     double running_time = 5;
     double tick = 0.25;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
     opts.add_value(running_time, 't', "running time", "running time in seconds", "RUNTIME");
     opts.add_value(tick, 'k', "tick time", "tick time as fraction of second", "TICK");
     try {
@@ -92,7 +92,7 @@ int main(int argc, char **argv) {
         recurring recurring_handler(int(running_time * 1000), int(tick * 1000));
         proton::container(recurring_handler).run();
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/server.cpp b/examples/cpp/server.cpp
index 370c33b..f4c75d7 100644
--- a/examples/cpp/server.cpp
+++ b/examples/cpp/server.cpp
@@ -80,7 +80,7 @@ class server : public proton::handler {
 
 int main(int argc, char **argv) {
     std::string address("amqp://0.0.0.0:5672/examples");
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "listen on URL", "URL");
 
@@ -91,7 +91,7 @@ int main(int argc, char **argv) {
         proton::container(srv).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/server_direct.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/server_direct.cpp b/examples/cpp/server_direct.cpp
index e092d5f..c4ecf8f 100644
--- a/examples/cpp/server_direct.cpp
+++ b/examples/cpp/server_direct.cpp
@@ -99,7 +99,7 @@ class server : public proton::handler {
 
 int main(int argc, char **argv) {
     std::string address("amqp://127.0.0.1:5672/examples");
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "listen on URL", "URL");
 
@@ -110,7 +110,7 @@ int main(int argc, char **argv) {
         proton::container(srv).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_recv.cpp b/examples/cpp/simple_recv.cpp
index 091264a..53f87d9 100644
--- a/examples/cpp/simple_recv.cpp
+++ b/examples/cpp/simple_recv.cpp
@@ -70,7 +70,7 @@ int main(int argc, char **argv) {
     std::string address("127.0.0.1:5672/examples");
 
     int message_count = 100;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL");
     opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
@@ -82,7 +82,7 @@ int main(int argc, char **argv) {
         proton::container(recv).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_send.cpp b/examples/cpp/simple_send.cpp
index 70aa4bc..f2a2477 100644
--- a/examples/cpp/simple_send.cpp
+++ b/examples/cpp/simple_send.cpp
@@ -78,7 +78,7 @@ class simple_send : public proton::handler {
 int main(int argc, char **argv) {
     std::string address("127.0.0.1:5672/examples");
     int message_count = 100;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
     opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
@@ -90,7 +90,7 @@ int main(int argc, char **argv) {
         proton::container(send).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/tutorial.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/tutorial.dox b/examples/cpp/tutorial.dox
new file mode 100644
index 0000000..b8383f7
--- /dev/null
+++ b/examples/cpp/tutorial.dox
@@ -0,0 +1,403 @@
+// -*-markdown-*-
+// NOTE: doxygen can include markdown pages directly but there seems to be a bug
+// that shows messed-up line numbers in \skip \until code extracts. This file
+// is markdown wrapped in a doxygen comment - which works. The file is best viewed/editied
+// as markdown.
+
+/**@page tutorial Tutorial
+
+This is a brief tutorial that will walk you through the fundamentals of building
+messaging applications in incremental steps. There are further examples, in
+addition the ones mentioned in the tutorial.
+
+Proton provides an "event-driven" programming model, where you implement a
+subclass of `proton::handler` and override functions that react to various AMQP
+events (connections opening and closing, messages being delivered and so on.)
+
+The examples below show how to implement handlers for clients and servers and
+how to run them using the `proton::container`, a portable, easy-to-use way to
+build single-threaded clients or servers.
+
+@note Proton can also be embedded or integrated into arbitrary IO frameworks and used
+to build multi-threaded applications. See namespace proton::mt for more.
+
+Some of the examples require an AMQP *broker* that can receive, store and send
+messages. @ref broker.hpp and @ref broker.cpp define a simple example
+broker. Run without arguments it listens on `0.0.0.0:5672`, the standard AMQP
+port on all network interfaces. To use a different port or network interface:
+
+    broker -a <host>:<port>
+
+Instead of the example broker, you can use any AMQP 1.0 compliant broker. You
+must configure your broker to have a queue (or topic) named "examples".
+
+The `helloworld` examples take an optional URL argument. The other examples take
+an option `-a URL`. A URL looks like:
+
+    HOST:PORT/ADDRESS
+
+It usually defaults to `127.0.0.1:5672/examples`, but you can change this if
+your broker is on a different host or port, or you want to use a different queue
+or topic name (the ADDRESS part of the URL). URL details are at `proton::url`
+
+Hello World!
+------------
+
+\dontinclude helloworld.cpp
+
+Tradition dictates that we start with hello world! This example sends a message
+to a broker and the receives the same message back to demonstrate sending and
+receiving. In a realistic system the sender and receiver would normally be in
+different processes. The complete example is @ref helloworld.cpp
+
+We will include the following classes: `proton::container` runs an event loop
+which dispatches events to a `proton::handler`. This allows a *reactive*
+style of programming which is well suited to messaging applications. `proton::url` is a simple parser for the URL format mentioned above.
+
+\skip   proton/container
+\until  proton/url
+
+We will define a class `hello_world` which is a subclass of
+`proton::handler` and over-rides functions to handle the events
+of interest in sending and receiving a message.
+
+\skip class hello_world
+\until {}
+
+`on_start()` is called when the event loop first starts. We handle that by
+establishing a connection and creating a sender and a receiver.
+
+\skip on_start
+\until }
+
+`on_sendable()` is called when message can be transferred over the associated
+sender link to the remote peer. We create a `proton::message`, set the message
+body to `"Hello World!"` and send the message. Then we close the sender as we only
+want to send one message. Closing the sender will prevent further calls to
+`on_sendable()`.
+
+\skip on_sendable
+\until }
+
+`on_message()` is called when a message is received. We just print the body of
+the message and close the connection, as we only want one message
+
+\skip on_message
+\until }
+
+The message body is a `proton::value`, see the documentation for more on how to
+extract the message body as type-safe C++ values.
+
+Our `main` function creates an instance of the `hello_world` handler and a
+proton::container using that handler. Calling `proton::container::run` sets
+things in motion and returns when we close the connection as there is nothing
+further to do. It may throw an exception, which will be a subclass of
+`proton::error`. That in turn is a subclass of `std::exception`.
+
+\skip main
+\until }
+\until }
+\until }
+
+Hello World, Direct!
+--------------------
+
+\dontinclude helloworld_direct.cpp
+
+Though often used in conjunction with a broker, AMQP does not *require* this. It
+also allows senders and receivers to communicate directly if desired.
+
+We will modify our example to send a message directly to itself. This is a bit
+contrived but illustrates both sides of the direct send/receive scenario. Full
+code at @ref helloworld_direct.cpp
+
+The first difference, is that rather than creating a receiver on the same
+connection as our sender, we listen for incoming connections by invoking the
+`proton::container::listen()` method on the container.
+
+\skip on_start
+\until }
+
+As we only need then to initiate one link, the sender, we can do that by
+passing in a url rather than an existing connection, and the connection
+will also be automatically established for us.
+
+We send the message in response to the `on_sendable()` callback and
+print the message out in response to the `on_message()` callback exactly
+as before.
+
+\skip on_sendable
+\until }
+\until }
+
+However we also handle two new events. We now close the connection from
+the senders side once the message has been accepted.
+The acceptance of the message is an indication of successful transfer to the
+peer. We are notified of that event through the `on_delivery_accept()`
+callback.
+
+\skip on_delivery_accept
+\until }
+
+Then, once the connection has been closed, of which we are
+notified through the `on_connection_close()` callback, we stop accepting incoming
+connections at which point there is no work to be done and the
+event loop exits, and the run() method will return.
+
+\skip on_connection_close
+\until }
+
+So now we have our example working without a broker involved!
+
+Note that for this example we pick an "unusual" port 8888 since we are talking
+to ourselves rather than a broker.
+
+\skipline url =
+
+Asynchronous Send and Receive
+-----------------------------
+
+Of course, these `HelloWorld!` examples are very artificial, communicating as
+they do over a network connection but with the same process. A more realistic
+example involves communication between separate processes (which could indeed be
+running on completely separate machines).
+
+Let's separate the sender from the receiver, and transfer more than a single
+message between them.
+
+We'll start with a simple sender @ref simple_send.cpp.
+
+\dontinclude simple_send.cpp
+
+As with the previous example, we define the application logic in a class that
+handles events. Because we are transferring more than one message, we need to
+keep track of how many we have sent. We'll use a `sent` member variable for
+that.  The `total` member variable will hold the number of messages we want to
+send.
+
+\skip class simple_send
+\until total
+
+As before, we use the `on_start()` event to establish our sender link over which
+we will transfer messages.
+
+\skip on_start
+\until }
+
+AMQP defines a credit-based flow control mechanism. Flow control allows
+the receiver to control how many messages it is prepared to receive at a
+given time and thus prevents any component being overwhelmed by the
+number of messages it is sent.
+
+In the `on_sendable()` callback, we check that our sender has credit
+before sending messages. We also check that we haven't already sent the
+required number of messages.
+
+\skip on_sendable
+\until }
+\until }
+
+The `proton::sender::send()` call above is asynchronous. When it returns the
+message has not yet actually been transferred across the network to the
+receiver. By handling the `on_accepted()` event, we can get notified when the
+receiver has received and accepted the message. In our example we use this event
+to track the confirmation of the messages we have sent. We only close the
+connection and exit when the receiver has received all the messages we wanted to
+send.
+
+\skip on_delivery_accept
+\until }
+\until }
+
+If we are disconnected after a message is sent and before it has been
+confirmed by the receiver, it is said to be `in doubt`. We don't know
+whether or not it was received. In this example, we will handle that by
+resending any in-doubt messages. This is known as an 'at-least-once'
+guarantee, since each message should eventually be received at least
+once, though a given message may be received more than once (i.e.
+duplicates are possible). In the `on_disconnected()` callback, we reset
+the sent count to reflect only those that have been confirmed. The
+library will automatically try to reconnect for us, and when our sender
+is sendable again, we can restart from the point we know the receiver
+got to.
+
+\skip on_disconnect
+\until }
+
+\dontinclude simple_recv.cpp
+
+Now let's look at the corresponding receiver @ref simple_recv.cpp
+
+This time we'll use an `expected` member variable for for the number of messages we expect and
+a `received` variable to count how many we have received so far.
+
+\skip class simple_recv
+\until received
+
+We handle `on_start()` by creating our receiver, much like we
+did for the sender.
+
+\skip on_start
+\until }
+
+We also handle the `on_message()` event for received messages and print the
+message out as in the `Hello World!` examples.  However we add some logic to
+allow the receiver to wait for a given number of messages, then to close the
+connection and exit. We also add some logic to check for and ignore duplicates,
+using a simple sequential id scheme.
+
+\skip on_message
+\until }
+
+Direct Send and Receive
+-----------------------
+
+Sending between these two examples requires an intermediary broker since neither
+accepts incoming connections. AMQP allows us to send messages directly between
+two processes. In that case one or other of the processes needs to accept
+incoming connections. Let's create a modified version of the receiving example
+that does this with @ref direct_recv.cpp
+
+\dontinclude direct_recv.cpp
+
+There are only two differences here. Instead of initiating a link (and
+implicitly a connection), we listen for incoming connections.
+
+
+\skip on_start
+\until }
+
+When we have received all the expected messages, we then stop listening for
+incoming connections by closing the acceptor object.
+
+\skip on_message
+\until }
+\until }
+\until }
+\until }
+
+You can use the @ref simple_send.cpp example to send to this receiver
+directly. (Note: you will need to stop any broker that is listening on the 5672
+port, or else change the port used by specifying a different address to each
+example via the -a command line switch).
+
+We can also modify the sender to allow the original receiver to connect to it,
+in @ref direct_send.cpp. Again that just requires two modifications:
+
+\dontinclude direct_send.cpp
+
+As with the modified receiver, instead of initiating establishment of a
+link, we listen for incoming connections.
+
+\skip on_start
+\until }
+
+When we have received confirmation of all the messages we sent, we can
+close the acceptor in order to exit.
+
+\skip on_delivery_accept
+\until }
+\until }
+
+To try this modified sender, run the original @ref simple_recv.cpp against it.
+
+The symmetry in the underlying AMQP that enables this is quite unique and
+elegant, and in reflecting this the proton API provides a flexible toolkit for
+implementing all sorts of interesting intermediaries (@ref broker.hpp and @ref
+broker.cpp provide a simple broker for testing purposes is an example of this).
+
+Request/Response
+----------------
+
+A common pattern is to send a request message and expect a response message in
+return. AMQP has special support for this pattern. Let's have a look at a simple
+example. We'll start with @ref server.cpp, the program that will process the
+request and send the response. Note that we are still using a broker in this
+example.
+
+Our server will provide a very simple service: it will respond with the
+body of the request converted to uppercase.
+
+\dontinclude server.cpp
+\skip class server
+\until };
+
+The code here is not too different from the simple receiver example.  When we
+receive a request in `on_message` however, we look at the
+`proton::message::reply_to` address and create a sender with that address for
+the response. We'll cache the senders incase we get further requests with the
+same `reply_to`.
+
+Now let's create a simple @ref client.cpp to test this service out.
+
+\dontinclude client.cpp
+
+Our client takes a list of strings to send as requests
+
+\skipline client(
+
+Since we will be sending and receiving, we create a sender and a receiver in
+`on_start`.  Our receiver has a blank address and sets the `dynamic` flag to
+true, which means we expect the remote end (broker or server) to assign a unique
+address for us.
+
+\skip on_start
+\until }
+
+Now a function to send the next request from our list of requests. We set the
+reply_to address to be the dynamically assigned address of our receiver.
+
+\skip send_request
+\until }
+
+We need to use the address assigned by the broker as the `reply_to` address of
+our requests, so we can't send them until our receiver has been set up. To do
+that, we add an `on_link_open()` method to our handler class, and if the link
+associated with event is the receiver, we use that as the trigger to send our
+first request.
+
+\skip on_link_open
+\until }
+
+When we receive a reply, we send the next request.
+
+\skip on_message
+\until }
+\until }
+\until }
+
+Direct Request/Response
+-----------------------
+
+We can avoid the intermediary process by writing a server that accepts
+connections directly, @ref server_direct.cpp. It involves the following changes
+to our original server:
+
+\dontinclude server_direct.cpp
+
+Our server must generate a unique reply-to addresses for links from the
+client that request a dynamic address (previously this was done by the broker.)
+We use a simple counter.
+
+\skip generate_address
+\until }
+
+Next we need to handle incoming requests for links with dynamic addresses from
+the client.  We give the link a unique address and record it in our `senders`
+map.
+
+\skip on_link_open
+\until }
+
+Note we are interested in *sender* links above because we are implementing the
+server. A *receiver* link created on the client corresponds to a *sender* link
+on the server.
+
+Finally when we receive a message we look up its `reply_to` in our senders map and send the reply.
+
+\skip on_message
+\until }
+\until }
+\until }
+
+*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index e1a826f..6b1459e 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -25,6 +25,7 @@ include_directories(
   "${CMAKE_CURRENT_SOURCE_DIR}/src")
 
 set(qpid-proton-cpp-source
+  ${qpid-proton-mt-source}
   src/acceptor.cpp
   src/binary.cpp
   src/byte_array.cpp
@@ -79,11 +80,10 @@ set(qpid-proton-cpp-source
   src/value.cpp
   )
 
-if(MSVC)
-  list(APPEND qpid-proton-cpp-source src/io/windows/socket.cpp)
-else(MSVC)
-  list(APPEND qpid-proton-cpp-source src/io/posix/socket.cpp)
-endif(MSVC)
+if (BUILD_CPP_MT)
+  list(APPEND qpid-proton-cpp-source src/controller.cpp)
+endif()
+
 
 set_source_files_properties (
   ${qpid-proton-cpp-source}
@@ -103,6 +103,9 @@ set_target_properties (
   SOVERSION "${PN_LIB_SOMAJOR}"
   LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
   )
+if (BUILD_CPP_MT)
+  set_target_properties(qpid-proton-cpp PROPERTIES CXX_STANDARD 11)
+endif()
 
 ## Install
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/cpp.cmake
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/cpp.cmake b/proton-c/bindings/cpp/cpp.cmake
index 3f4c313..f89cc0d 100644
--- a/proton-c/bindings/cpp/cpp.cmake
+++ b/proton-c/bindings/cpp/cpp.cmake
@@ -31,4 +31,7 @@ if (CMAKE_CXX_COMPILER)
   if (HAS_STD_PTR)
     add_definitions(-DPN_CPP_HAS_STD_PTR=1)
   endif()
+  check_cxx_source_compiles("#if defined(__cplusplus) && __cplusplus >= 201100\nint main(int, char**) { return 0; }\n#endif" HAS_CPP11)
+
+  option(BUILD_CPP_MT "Build C++ multi-thread extensions, requires C++11" ${HAS_CPP11})
 endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/docs/mainpage.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mainpage.md b/proton-c/bindings/cpp/docs/mainpage.md
index 8ad34cb..dffb3ed 100644
--- a/proton-c/bindings/cpp/docs/mainpage.md
+++ b/proton-c/bindings/cpp/docs/mainpage.md
@@ -4,7 +4,7 @@ This is the C++ API for the Proton AMQP protocol engine. It allows you
 to write client and server applications that send and receive AMQP
 messages.
 
-The best way to start is with the \ref tutorial "tutorial".
+The best way to start is with the @ref tutorial
 
 ## An overview of the AMQP model
 
@@ -22,13 +22,13 @@ Sessions are created over a `proton::connection`, which represents the
 network connection. You can create links directly on a connection
 using its default session if you don't need multiple sessions.
 
-`proton::message` represents the message: the body (content), headers,
-annotations, and so on. A `proton::delivery` represents the act of
-transferring a message over a link. The receiver acknowledges the
-delivery by accepting or rejecting it. The delivery is *settled* when
-both ends are done with it.  Different settlement methods give
-different levels of reliability: *at-most-once*, *at-least-once*, and
-*exactly-once*. See "Delivery Guarantees" below for details.
+`proton::message` represents the message: the body (content), properties,
+headers and annotations. A `proton::delivery` represents the act of transferring
+a message over a link. The receiver acknowledges the delivery by accepting or
+rejecting it. The delivery is *settled* when both ends are done with it.
+Different settlement methods give different levels of reliability:
+*at-most-once*, *at-least-once*, and *exactly-once*. See "Delivery Guarantees"
+below for details.
 
 ## Sources and targets
 
@@ -58,78 +58,9 @@ the client's dynamic receiver.
 
 In the case of a broker, a dynamic address usually corresponds to a
 temporary queue, but any AMQP request-response server can use this
-technique. The \ref server_direct.cpp example illustrates how to
+technique. The @ref server_direct.cpp example illustrates how to
 implement a queueless request-response server.
 
-## Anatomy of a Proton application
-
-To send AMQP commands, call methods on classes like
-`proton::connection`, `proton::sender`, `proton::receiver`, or
-`proton::delivery`. To handle incoming commands, implement the
-`proton::handler` interface. The handler receives calls like
-`on_message` (a message is received), `on_link_open` (a link is
-opened), and `on_sendable` (a link is ready to send messages).
-
-Messages are represented by `proton::message`. AMQP defines a type
-encoding that you can use for interoperability, but you can also use
-any encoding you wish and pass binary data as the
-`proton::message::body`. `proton::value` and `proton::scalar` provide
-conversion between AMQP and C++ data types.
-
-<!-- See the example \ref encode_decode.cpp. -->
-
-There are two alternative ways to manage handlers and AMQP objects,
-the `proton::container` and the `proton::connection_engine`. You can
-code your application so that it can be run with either. Say you find
-the `proton::container` easier to get started but later need more
-flexibility.  You can switch to the `proton::connection_engine` with
-little or no change to your handlers.
-
-### %proton::container
-
-`proton::container` is a *reactor* framework that manages multiple
-connections and dispatches events to handlers. You implement
-`proton::handler` with your logic to react to events, and register it
-with the container. The container lets you open multiple connections
-and links, receive incoming connections and links, and send, receive,
-and acknowledge messages.
-
-The reactor handles IO for multiple connections using sockets and
-`poll`. It dispatches events to your handlers in a single thread,
-where you call `proton::container::run`. The container is not
-thread-safe: once it is running you can only operate on Proton objects
-from your handler methods.
-
-### %proton::connection_engine
-
-`proton::connection_engine` dispatches events for a *single
-connection*. The subclass `proton::io::socket::engine` does
-socket-based IO. An application with a single connection is just like
-using `proton::container` except you attach your handler to a
-`proton::io::socket::engine` instead. You can compare examples, such as
-\ref helloworld.cpp and \ref engine/helloworld.cpp.
-
-Now consider multiple connections. `proton::container` is easy to use
-but restricted to a single thread. `proton::connection_engine` is not
-thread-safe either, but *each engine is independent*. You can process
-different connections in different threads, or use a thread pool to
-process a dynamic set of connections.
-
-The engine does not provide built-in polling and listening like the
-`proton::container`, but you can drive engines using any polling or
-threading strategy and any IO library (for example, epoll, kqueue,
-solaris completion ports, IOCP proactor, boost::asio, libevent, etc.)
-This can be important for optimizing performance or supporting diverse
-platforms. The example \ref engine/broker.cpp shows a broker using
-sockets and poll, but you can see how the code could be adapted.
-
-`proton::connection_engine` also does not dictate the IO mechanism,
-but it is an abstract class. `proton::socket::engine` provides
-ready-made socket-based IO, but you can write your own subclass with
-any IO code. Just override the `io_read`, `io_write` and `io_close`
-methods. For example, the proton test suite implements an in-memory
-connection using `std::deque` for test purposes.
-
 ## Delivery guarantees
 
 For *at-most-once*, the sender settles the message as soon as it sends
@@ -152,3 +83,68 @@ informs the sender of all the unsettled deliveries it knows about, and
 from this the sender can deduce which need to be redelivered. The
 sender likewise informs the receiver which deliveries it knows about,
 from which the receiver can deduce which have already been settled.
+
+## Anatomy of a Proton application
+
+To send AMQP commands, call methods on classes like `proton::connection`,
+`proton::sender`, `proton::receiver`, or `proton::delivery`.
+
+To handle incoming commands, subclass the `proton::handler` interface. The
+handler member functions are called when AMQP protocol events occur on a
+connection. For example `proton::handler::on_message` is called when a message
+is received.
+
+Messages are represented by `proton::message`. AMQP defines a type
+encoding that you can use for interoperability, but you can also use
+any encoding you wish and pass binary data as the
+`proton::message::body`. `proton::value` and `proton::scalar` provide
+conversion between AMQP and C++ data types.
+
+There are several ways to manage handlers and AMQP objects, for different types
+of application. All of them use the same `proton::handler` sub-classes so code
+can be re-used if you change your approach.
+
+### %proton::container - easy single-threaded applications
+
+`proton::container` is an easy way to get started with single threaded
+applications. It manages the low-level socket connection and polling for you so
+you can write portable applications using just the @ref proton API. You an use
+proton::connection::connect() and proton::container::listen() to create
+connections. The container polls multiple connections and calls protocol eventsa
+on your `proton::handler` sub-classes.
+
+The container has two limitations:
+- it is not thread-safe, all container work must happen in a single thread.
+- it provides portable IO handling for you but cannot be used for different types of IO.
+
+### %proton::mt::controller - multi-threaded applications
+
+The proton::controller is similar to the proton::container but it can process
+connections in multiple threads concurrently. It uses the `proton::handler` like
+the container. If you follow the recommended model you can re-use statefull,
+per-connection, handlers with the controller. See @ref mt_page and the examples
+@ref mt/broker.cpp and @ref mt/epoll\_controller.cpp
+
+Default controller IO implementations are provided but you can also implement
+your own controller using the proton::io::connection_engine, read on...
+
+### %proton::io::connection_engine - integrating with foreign IO
+
+The `proton::io::connection_engine` is different from the other proton APIs. You
+might think of it as more like an SPI (Service Provided Interface).
+
+The engine provides a very low-level way of driving a proton::handler: You feed
+raw byte-sequence fragments of an AMQP-encoded stream to the engine and it
+converts that into calls on a proton::handler. The engine provides you with
+outgoing protocol stream bytes in return.
+
+The engine is deliberately very simple and low level. It does no IO, no
+thread-related locking, and is written in simple C++98 compatible code.
+
+You can use the engine directly to connect your application to any kind of IO
+framework or library, memory-based streams or any other source/sink for byte
+stream data.
+
+You can also use the engine to build a custom implementation of
+proton::mt::controller and proton::mt::work\_queue so portable proton
+applications using the controller can run without modification on your platform.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/docs/mt_page.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mt_page.md b/proton-c/bindings/cpp/docs/mt_page.md
new file mode 100644
index 0000000..bed53e9
--- /dev/null
+++ b/proton-c/bindings/cpp/docs/mt_page.md
@@ -0,0 +1,21 @@
+# Multi-threaded proton {#mt_page}
+
+Most classes in namespace @ref proton are not thread-safe. Objects associated
+with a single connection *must not* be used concurrently. However objects
+associated with *different* connections *can* be used concurrently in separate
+threads.
+
+The recommended way to use proton multi-threaded is to *serialize* the work for
+each connection but allow different connections to be processed concurrently.
+
+proton::controller allows you to manage connections in a multi-threaded way. You
+supply a proton::handler for each connection. Proton will ensure that the
+`proton::handler::on_*()` functions are never called concurrently so
+per-connection handlers do not need a lock even if they have state.
+
+proton::work_queue allows you to make calls to arbitrary functions or other
+code, serialized in the same way as `proton::handler::on_()` calls. Typically
+this is used to call your own handler's member functions in the same way as
+proton::handler override functions.
+
+For examples see @ref mt/broker.cpp, mt/simple\_send.cpp and mt/simple\_recv.cpp

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/docs/tutorial.dox
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/tutorial.dox b/proton-c/bindings/cpp/docs/tutorial.dox
deleted file mode 100644
index e40a3e7..0000000
--- a/proton-c/bindings/cpp/docs/tutorial.dox
+++ /dev/null
@@ -1,428 +0,0 @@
-// -*-markdown-*-
-// NOTE: doxygen can include markdown pages directly but there seems to be a bug
-// that shows messed-up line numbers in \skip \until code extracts so this file
-// is markdown wrapped in a C++ comment - which works.
-
-/**\page tutorial Tutorial
-
-This is a brief tutorial that will walk you through the fundamentals of building
-messaging applications in incremental steps. There are further examples, in
-addition the ones mentioned in the tutorial.
-
-Some of the examples require an AMQP *broker* that can receive, store and send
-messages. \ref broker.hpp and \ref broker.cpp define a simple example
-broker. Run without arguments it listens on `0.0.0.0:5672`, the standard AMQP
-port on all network interfaces. To use a different port or network interface:
-
-    broker -a <host>:<port>
-
-Instead of the example broker, you can use any AMQP 1.0 compliant broker. You
-must configure your broker to have a queue (or topic) named "examples".
-
-The `helloworld` examples take an optional URL argument. The other examples take
-an option `-a URL`. A URL looks like:
-
-    HOST:PORT/ADDRESS
-
-It usually defaults to `127.0.0.1:5672/examples`, but you can change this if
-your broker is on a different host or port, or you want to use a different queue
-or topic name (the ADDRESS part of the URL). URL details are at `proton::url`
-
-The first part of the tutorial uses the `proton::container`, later we will
-show some of the same examples implemented using the `proton::connection_engine`.
-Most of the code is the same for either approach.
-
-Hello World!
-------------
-
-\dontinclude helloworld.cpp
-
-Tradition dictates that we start with hello world! This example sends a message
-to a broker and the receives the same message back to demonstrate sending and
-receiving. In a realistic system the sender and receiver would normally be in
-different processes. The complete example is \ref helloworld.cpp
-
-We will include the following classes: `proton::container` runs an event loop
-which dispatches events to a `proton::handler`. This allows a *reactive*
-style of programming which is well suited to messaging applications. `proton::url` is a simple parser for the URL format mentioned above.
-
-\skip   proton/container
-\until  proton/url
-
-We will define a class `hello_world` which is a subclass of
-`proton::handler` and over-rides functions to handle the events
-of interest in sending and receiving a message.
-
-\skip class hello_world
-\until {}
-
-`on_start()` is called when the event loop first starts. We handle that by
-establishing a connection and creating a sender and a receiver.
-
-\skip on_start
-\until }
-
-`on_sendable()` is called when message can be transferred over the associated
-sender link to the remote peer. We create a `proton::message`, set the message
-body to `"Hello World!"` and send the message. Then we close the sender as we only
-want to send one message. Closing the sender will prevent further calls to
-`on_sendable()`.
-
-\skip on_sendable
-\until }
-
-`on_message()` is called when a message is received. We just print the body of
-the message and close the connection, as we only want one message
-
-\skip on_message
-\until }
-
-The message body is a `proton::value`, see the documentation for more on how to
-extract the message body as type-safe C++ values.
-
-Our `main` function creates an instance of the `hello_world` handler and a
-proton::container using that handler. Calling `proton::container::run` sets
-things in motion and returns when we close the connection as there is nothing
-further to do. It may throw an exception, which will be a subclass of
-`proton::error`. That in turn is a subclass of `std::exception`.
-
-\skip main
-\until }
-\until }
-\until }
-
-Hello World, Direct!
---------------------
-
-\dontinclude helloworld_direct.cpp
-
-Though often used in conjunction with a broker, AMQP does not *require* this. It
-also allows senders and receivers to communicate directly if desired.
-
-We will modify our example to send a message directly to itself. This is a bit
-contrived but illustrates both sides of the direct send/receive scenario. Full
-code at \ref helloworld_direct.cpp
-
-The first difference, is that rather than creating a receiver on the same
-connection as our sender, we listen for incoming connections by invoking the
-`proton::container::listen()` method on the container.
-
-\skip on_start
-\until }
-
-As we only need then to initiate one link, the sender, we can do that by
-passing in a url rather than an existing connection, and the connection
-will also be automatically established for us.
-
-We send the message in response to the `on_sendable()` callback and
-print the message out in response to the `on_message()` callback exactly
-as before.
-
-\skip on_sendable
-\until }
-\until }
-
-However we also handle two new events. We now close the connection from
-the senders side once the message has been accepted.
-The acceptance of the message is an indication of successful transfer to the
-peer. We are notified of that event through the `on_delivery_accept()`
-callback.
-
-\skip on_delivery_accept
-\until }
-
-Then, once the connection has been closed, of which we are
-notified through the `on_connection_close()` callback, we stop accepting incoming
-connections at which point there is no work to be done and the
-event loop exits, and the run() method will return.
-
-\skip on_connection_close
-\until }
-
-So now we have our example working without a broker involved!
-
-Note that for this example we pick an "unusual" port 8888 since we are talking
-to ourselves rather than a broker.
-
-\skipline url =
-
-Asynchronous Send and Receive
------------------------------
-
-Of course, these `HelloWorld!` examples are very artificial, communicating as
-they do over a network connection but with the same process. A more realistic
-example involves communication between separate processes (which could indeed be
-running on completely separate machines).
-
-Let's separate the sender from the receiver, and transfer more than a single
-message between them.
-
-We'll start with a simple sender \ref simple_send.cpp.
-
-\dontinclude simple_send.cpp
-
-As with the previous example, we define the application logic in a class that
-handles events. Because we are transferring more than one message, we need to
-keep track of how many we have sent. We'll use a `sent` member variable for
-that.  The `total` member variable will hold the number of messages we want to
-send.
-
-\skip class simple_send
-\until total
-
-As before, we use the `on_start()` event to establish our sender link over which
-we will transfer messages.
-
-\skip on_start
-\until }
-
-AMQP defines a credit-based flow control mechanism. Flow control allows
-the receiver to control how many messages it is prepared to receive at a
-given time and thus prevents any component being overwhelmed by the
-number of messages it is sent.
-
-In the `on_sendable()` callback, we check that our sender has credit
-before sending messages. We also check that we haven't already sent the
-required number of messages.
-
-\skip on_sendable
-\until }
-\until }
-
-The `proton::sender::send()` call above is asynchronous. When it returns the
-message has not yet actually been transferred across the network to the
-receiver. By handling the `on_accepted()` event, we can get notified when the
-receiver has received and accepted the message. In our example we use this event
-to track the confirmation of the messages we have sent. We only close the
-connection and exit when the receiver has received all the messages we wanted to
-send.
-
-\skip on_delivery_accept
-\until }
-\until }
-
-If we are disconnected after a message is sent and before it has been
-confirmed by the receiver, it is said to be `in doubt`. We don't know
-whether or not it was received. In this example, we will handle that by
-resending any in-doubt messages. This is known as an 'at-least-once'
-guarantee, since each message should eventually be received at least
-once, though a given message may be received more than once (i.e.
-duplicates are possible). In the `on_disconnected()` callback, we reset
-the sent count to reflect only those that have been confirmed. The
-library will automatically try to reconnect for us, and when our sender
-is sendable again, we can restart from the point we know the receiver
-got to.
-
-\skip on_disconnect
-\until }
-
-\dontinclude simple_recv.cpp
-
-Now let's look at the corresponding receiver \ref simple_recv.cpp
-
-This time we'll use an `expected` member variable for for the number of messages we expect and
-a `received` variable to count how many we have received so far.
-
-\skip class simple_recv
-\until received
-
-We handle `on_start()` by creating our receiver, much like we
-did for the sender.
-
-\skip on_start
-\until }
-
-We also handle the `on_message()` event for received messages and print the
-message out as in the `Hello World!` examples.  However we add some logic to
-allow the receiver to wait for a given number of messages, then to close the
-connection and exit. We also add some logic to check for and ignore duplicates,
-using a simple sequential id scheme.
-
-\skip on_message
-\until }
-
-Direct Send and Receive
------------------------
-
-Sending between these two examples requires an intermediary broker since neither
-accepts incoming connections. AMQP allows us to send messages directly between
-two processes. In that case one or other of the processes needs to accept
-incoming connections. Let's create a modified version of the receiving example
-that does this with \ref direct_recv.cpp
-
-\dontinclude direct_recv.cpp
-
-There are only two differences here. Instead of initiating a link (and
-implicitly a connection), we listen for incoming connections.
-
-
-\skip on_start
-\until }
-
-When we have received all the expected messages, we then stop listening for
-incoming connections by closing the acceptor object.
-
-\skip on_message
-\until }
-\until }
-\until }
-\until }
-
-You can use the \ref simple_send.cpp example to send to this receiver
-directly. (Note: you will need to stop any broker that is listening on the 5672
-port, or else change the port used by specifying a different address to each
-example via the -a command line switch).
-
-We can also modify the sender to allow the original receiver to connect to it,
-in \ref direct_send.cpp. Again that just requires two modifications:
-
-\dontinclude direct_send.cpp
-
-As with the modified receiver, instead of initiating establishment of a
-link, we listen for incoming connections.
-
-\skip on_start
-\until }
-
-When we have received confirmation of all the messages we sent, we can
-close the acceptor in order to exit.
-
-\skip on_delivery_accept
-\until }
-\until }
-
-To try this modified sender, run the original \ref simple_recv.cpp against it.
-
-The symmetry in the underlying AMQP that enables this is quite unique and
-elegant, and in reflecting this the proton API provides a flexible toolkit for
-implementing all sorts of interesting intermediaries (\ref broker.hpp and \ref
-broker.cpp provide a simple broker for testing purposes is an example of this).
-
-Request/Response
-----------------
-
-A common pattern is to send a request message and expect a response message in
-return. AMQP has special support for this pattern. Let's have a look at a simple
-example. We'll start with \ref server.cpp, the program that will process the
-request and send the response. Note that we are still using a broker in this
-example.
-
-Our server will provide a very simple service: it will respond with the
-body of the request converted to uppercase.
-
-\dontinclude server.cpp
-\skip class server
-\until };
-
-The code here is not too different from the simple receiver example.  When we
-receive a request in `on_message` however, we look at the
-`proton::message::reply_to` address and create a sender with that address for
-the response. We'll cache the senders incase we get further requests with the
-same `reply_to`.
-
-Now let's create a simple \ref client.cpp to test this service out.
-
-\dontinclude client.cpp
-
-Our client takes a list of strings to send as requests
-
-\skipline client(
-
-Since we will be sending and receiving, we create a sender and a receiver in
-`on_start`.  Our receiver has a blank address and sets the `dynamic` flag to
-true, which means we expect the remote end (broker or server) to assign a unique
-address for us.
-
-\skip on_start
-\until }
-
-Now a function to send the next request from our list of requests. We set the
-reply_to address to be the dynamically assigned address of our receiver.
-
-\skip send_request
-\until }
-
-We need to use the address assigned by the broker as the `reply_to` address of
-our requests, so we can't send them until our receiver has been set up. To do
-that, we add an `on_link_open()` method to our handler class, and if the link
-associated with event is the receiver, we use that as the trigger to send our
-first request.
-
-\skip on_link_open
-\until }
-
-When we receive a reply, we send the next request.
-
-\skip on_message
-\until }
-\until }
-\until }
-
-Direct Request/Response
------------------------
-
-We can avoid the intermediary process by writing a server that accepts
-connections directly, \ref server_direct.cpp. It involves the following changes
-to our original server:
-
-\dontinclude server_direct.cpp
-
-Our server must generate a unique reply-to addresses for links from the
-client that request a dynamic address (previously this was done by the broker.)
-We use a simple counter.
-
-\skip generate_address
-\until }
-
-Next we need to handle incoming requests for links with dynamic addresses from
-the client.  We give the link a unique address and record it in our `senders`
-map.
-
-\skip on_link_open
-\until }
-
-Note we are interested in *sender* links above because we are implementing the
-server. A *receiver* link created on the client corresponds to a *sender* link
-on the server.
-
-Finally when we receive a message we look up its `reply_to` in our senders map and send the reply.
-
-\skip on_message
-\until }
-\until }
-\until }
-
-Connection Engine
------------------
-
-The `proton::connection_engine` is an alternative to the container. For simple
-applications with a single connection, its use is about the same as the the
-`proton::container`, but it allows more flexibility for multi-threaded
-applications or applications with unusual IO requirements.
-
-\dontinclude engine/helloworld.cpp
-
-We'll look at the \ref engine/helloworld.cpp example step-by-step to see how it differs
-from the container \ref helloworld.cpp version.
-
-First we include the `proton::io::socket::engine` class, which is a `proton::connection_engine`
-that uses socket IO.
-
-\skipline proton/io.hpp
-
-Our `hello_world` class differs only in the `on_start()` method. Instead of
-calling `container.connect()`, we simply call `proton::connection::open` to open the
-engine's' connection:
-
-\skip on_start
-\until }
-
-Our `main` function only differs in that it creates and runs a `socket::engine`
-instead of a `container`.
-
-\skip main
-\until }
-\until }
-\until }
-
-*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/docs/user.doxygen.in
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/user.doxygen.in b/proton-c/bindings/cpp/docs/user.doxygen.in
index 26b1784..7628151 100644
--- a/proton-c/bindings/cpp/docs/user.doxygen.in
+++ b/proton-c/bindings/cpp/docs/user.doxygen.in
@@ -53,8 +53,9 @@ WARNINGS               = YES
 
 # Configuration options related to the input files
 
-INPUT = @CMAKE_SOURCE_DIR@/proton-c/bindings/cpp/include @CMAKE_SOURCE_DIR@/proton-c/bindings/cpp/docs @CMAKE_SOURCE_DIR@/examples/cpp/README.dox
+INPUT = @CMAKE_SOURCE_DIR@/proton-c/bindings/cpp/include @CMAKE_SOURCE_DIR@/proton-c/bindings/cpp/docs @CMAKE_SOURCE_DIR@/examples/cpp
 FILE_PATTERNS          = *.hpp *.md *.dox
+EXCLUDE_PATTERNS       = @CMAKE_SOURCE_DIR@/examples/*.?pp  # Don't parse example sources, only *.dox
 FULL_PATH_NAMES        = YES
 RECURSIVE              = YES
 STRIP_FROM_PATH        = @CMAKE_SOURCE_DIR@/proton-c/bindings/cpp/include

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index d2efab9..a4b7246 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -139,14 +139,18 @@ class connection_options {
     PN_CPP_EXTERN connection_options& sasl_config_path(const std::string &);
     /// @endcond
 
-    /// @cond INTERNAL
+    /// Update option values from values set in other.
+    PN_CPP_EXTERN connection_options& update(const connection_options& other);
+
+    /// Copy and update option values from values set in other.
+    PN_CPP_EXTERN connection_options update(const connection_options& other) const;
+
   private:
     void apply(connection&) const;
     proton_handler* handler() const;
     static pn_connection_t *pn_connection(connection &);
     class ssl_client_options &ssl_client_options();
     class ssl_server_options &ssl_server_options();
-    PN_CPP_EXTERN void update(const connection_options& other);
 
     class impl;
     internal::pn_unique_ptr<impl> impl_;
@@ -154,7 +158,6 @@ class connection_options {
     friend class container_impl;
     friend class connector;
     friend class io::connection_engine;
-    /// @endcond
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/controller.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/controller.hpp b/proton-c/bindings/cpp/include/proton/controller.hpp
new file mode 100644
index 0000000..6b0784c
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/controller.hpp
@@ -0,0 +1,118 @@
+#ifndef PROTON_MT_HPP
+#define PROTON_MT_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+#include <proton/handler.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/error_condition.hpp>
+
+#include <functional>
+#include <memory>
+
+namespace proton {
+
+class connection;
+
+/// The controller lets the application initiate and listen for connections, and
+/// start/stop overall IO activity. A single controller can manage many
+/// connections.
+///
+/// A controller associates a proton::handler with a proton::connection (the
+/// AMQP protocol connection) and the corresponding proton::transport
+/// (representing the underlying IO connection)
+///
+/// The first call to a proton::handler is always handler::on_transport_open(),
+/// the last is handler::on_transport_close(). Handlers can be deleted after
+/// handler::on_transport_close().
+///
+class controller {
+  public:
+    /// Create an instance of the default controller implementation.
+    /// @param container_id set on connections for this controller.
+    /// If empty, generate a random QUID default.
+    PN_CPP_EXTERN static std::unique_ptr<controller> create();
+
+    /// Get the controller associated with a connection.
+    /// @throw proton::error if this is not a controller-managed connection.
+    PN_CPP_EXTERN static controller& get(const proton::connection&);
+
+    controller(const controller&) = delete;
+
+    virtual ~controller() {}
+
+    /// Start listening on address.
+    ///
+    /// @param address identifies a listening address.
+    ///
+    /// @param make_handler returns a handler for each accepted connection.
+    /// handler::on_connection_open() is called with the incoming
+    /// proton::connection.  The handler can accept by calling
+    /// connection::open()) or reject by calling connection::close()).
+    ///
+    /// Calls to the factory for this address are serialized. Calls for separate
+    /// addresses in separate calls to listen() may be concurrent.
+    ///
+    virtual void listen(
+        const std::string& address,
+        std::function<proton::handler*(const std::string&)> make_handler,
+        const connection_options& = connection_options()) = 0;
+
+    /// Stop listening on address, must match exactly with address string given to listen().
+    virtual void stop_listening(const std::string& address) = 0;
+
+    /// Connect to address.
+    ///
+    /// The handler will get a proton::handler::on_connection_open() call the
+    /// connection is completed by the other end.
+    virtual void connect(
+        const std::string& address, proton::handler&,
+        const connection_options& = connection_options()) = 0;
+
+    /// Default options for all connections, e.g. container_id.
+    virtual void options(const connection_options&) = 0;
+
+    /// Default options for all connections, e.g. container_id.
+    virtual connection_options options() = 0;
+
+    /// Run the controller in this thread. Returns when the controller is
+    /// stopped.  Multiple threads can call run()
+    virtual void run() = 0;
+
+    /// Stop the controller: abort open connections, run() will return in all threads.
+    /// Handlers will receive on_transport_error() with the error_condition.
+    virtual void stop(const error_condition& = error_condition()) = 0;
+
+    /// The controller will stop (run() will return) when there are no open
+    /// connections or listeners left.
+    virtual void stop_on_idle() = 0;
+
+    /// Wait till the controller is stopped.
+    virtual void wait() = 0;
+
+  protected:
+    controller() {}
+};
+
+}
+
+
+#endif // PROTON_MT_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/error.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/error.hpp b/proton-c/bindings/cpp/include/proton/error.hpp
index dd30867..1b77b3f 100644
--- a/proton-c/bindings/cpp/include/proton/error.hpp
+++ b/proton-c/bindings/cpp/include/proton/error.hpp
@@ -22,14 +22,17 @@
  *
  */
 
-#include "proton/config.hpp"
-#include "proton/export.hpp"
+#include <proton/config.hpp>
+#include <proton/export.hpp>
+#include <proton/value.hpp>
 
 #include <stdexcept>
 #include <string>
 
 namespace proton {
 
+class value;
+
 /// The base proton error.
 ///
 /// All exceptions thrown from functions in the proton namespace are

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index ef87ff3..4526036 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -50,6 +50,18 @@ class connection_engine;
 ///
 /// Subclass and override event-handling member functions.
 ///
+/// Close and error handling: there are several objects that have on_X_close and on_X_error functions.
+/// They are called as follows:
+///
+/// - If X is closed cleanly, with no error status then on_X_close() is called.
+/// - If X is closed with an error then on_X_error() is called followed by on_X_close()
+///   Note the error condition is also available in on_X_close from X::condition().
+///
+/// By default, if you do not implement on_X_error, it will call
+/// on_unhandled_error().  If you do not implement on_unhandled_error() it will
+/// throw a proton::error exception, which may not be what you want but does
+/// help to identify forgotten error handling quickly.
+///
 /// @see proton::event
 class
 PN_CPP_CLASS_EXTERN handler

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
index acf47fa..ded68de 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
@@ -20,6 +20,7 @@
  * under the License.
  */
 
+#include "proton/config.hpp"
 #include "proton/connection.hpp"
 #include "proton/connection_options.hpp"
 #include "proton/error.hpp"
@@ -38,27 +39,44 @@ struct pn_collector_t;
 namespace proton {
 
 class handler;
+class work_queue;            // Only used for multi-threaded connection_engines.
 
-/// Contains classes to integrate proton into different IO and threading environments.
-namespace io {
+/** @page integration
 
-///@cond INTERNAL
-class connection_engine_context;
-///
+This namespace contains a low-level "Service Provider Interface" that can be
+used to implement the proton API over any native or 3rd party IO library.
+
+The io::connection_engine is the core engine that converts raw AMQP bytes read
+from any IO source into proton::handler event calls, and generates AMQP
+byte-encoded output that can be written to any IO destination.
+
+The integration needs to implement two user-visible interfaces:
+ - proton::controller lets the user initiate or listen for connections.
+ - proton::work_queue lets the user serialize their own work with a connection.
+
+ @see epoll_controller.cpp for an example of an integration.
+
+[TODO controller doesn't belong in the mt namespace, a single-threaded
+integration would need a controller too.]
+
+*/
+namespace io {
 
 /// Pointer to a mutable memory region with a size.
 struct mutable_buffer {
-    char* data;
-    size_t size;
+    char* data;                 ///< Beginning of the buffered data.
+    size_t size;                ///< Number of bytes in the buffer.
 
+    /// Construct a buffer starting at data_ with size_ bytes.
     mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {}
 };
 
 /// Pointer to a const memory region with a size.
 struct const_buffer {
-    const char* data;
-    size_t size;
+    const char* data;           ///< Beginning of the buffered data.
+    size_t size;                ///< Number of bytes in the buffer.
 
+    /// Construct a buffer starting at data_ with size_ bytes.
     const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
 };
 
@@ -88,39 +106,16 @@ struct const_buffer {
 /// epoll) or an async-request driven proactor (e.g. windows completion ports,
 /// boost.asio, libuv etc.)
 ///
+/// The engine never throws exceptions.
+///
 class
 PN_CPP_CLASS_EXTERN connection_engine {
   public:
-    // TODO aconway 2016-03-18: this will change
-    class container {
-      public:
-        /// Create a container with id.  Default to random UUID.
-        PN_CPP_EXTERN container(const std::string &id = "");
-        PN_CPP_EXTERN ~container();
-
-        /// Return the container-id
-        PN_CPP_EXTERN std::string id() const;
-
-        /// Make options to configure a new engine, using the default options.
-        ///
-        /// Call this once for each new engine as the options include a generated unique link_prefix.
-        /// You can modify the configuration before creating the engine but you should not
-        /// modify the container_id or link_prefix.
-        PN_CPP_EXTERN connection_options make_options();
-
-        /// Set the default options to be used for connection engines.
-        /// The container will set the container_id and link_prefix when make_options is called.
-        PN_CPP_EXTERN void options(const connection_options&);
-
-      private:
-        class impl;
-        internal::pn_unique_ptr<impl> impl_;
-    };
-
     /// Create a connection engine that dispatches to handler.
+    // TODO aconway 2016-04-06: no options, only via handler.
     PN_CPP_EXTERN connection_engine(handler&, const connection_options& = connection_options());
 
-    PN_CPP_EXTERN virtual ~connection_engine();
+    PN_CPP_EXTERN ~connection_engine();
 
     /// The engine's read buffer. Read data into this buffer then call read_done() when complete.
     /// Returns mutable_buffer(0, 0) if the engine cannot currently read data.
@@ -132,6 +127,7 @@ PN_CPP_CLASS_EXTERN connection_engine {
     PN_CPP_EXTERN void read_done(size_t n);
 
     /// Indicate that the read side of the transport is closed and no more data will be read.
+    /// Note that there may still be events to dispatch() or data to write.
     PN_CPP_EXTERN void read_close();
 
     /// The engine's write buffer. Write data from this buffer then call write_done()
@@ -141,9 +137,11 @@ PN_CPP_CLASS_EXTERN connection_engine {
 
     /// Indicate that the first n bytes of write_buffer() have been written successfully.
     /// This changes the buffer, call write_buffer() to get the updated buffer.
+
     PN_CPP_EXTERN void write_done(size_t n);
 
-    /// Indicate that the write side of the transport has closed and no more data will be written.
+    /// Indicate that the write side of the transport has closed and no more data can be written.
+    /// Note that there may still be events to dispatch() or data to read.
     PN_CPP_EXTERN void write_close();
 
     /// Close the engine with an error that will be passed to handler::on_transport_error().
@@ -154,15 +152,14 @@ PN_CPP_CLASS_EXTERN connection_engine {
     /// Dispatch all available events and call the corresponding \ref handler methods.
     ///
     /// Returns true if the engine is still active, false if it is finished and
-    /// can be destroyed. The engine is finished when either of the following is
-    /// true:
+    /// can be destroyed. The engine is finished when all events are dispatched
+    /// and one of the following is true:
     ///
     /// - both read_close() and write_close() have been called, no more IO is possible.
-    /// - The AMQP connection() is closed AND write_buffer() is empty.
+    /// - The AMQP connection() is closed AND the write_buffer() is empty.
     ///
-    /// May expand the read_buffer() and/or the write_buffer().
+    /// May modify the read_buffer() and/or the write_buffer().
     ///
-    /// @throw any exceptions thrown by the \ref handler.
     PN_CPP_EXTERN bool dispatch();
 
     /// Get the AMQP connection associated with this connection_engine.
@@ -171,6 +168,12 @@ PN_CPP_CLASS_EXTERN connection_engine {
     /// Get the transport associated with this connection_engine.
     PN_CPP_EXTERN proton::transport transport() const;
 
+    /// For controller connections, set the connection's work_queue. Set
+    /// via plain pointer, not std::shared_ptr so the connection_engine can be
+    /// compiled with C++03.  The work_queue must outlive the engine. The
+    /// std::shared_ptr<work_queue> will be available via work_queue::get(this->connection())
+    PN_CPP_EXTERN void work_queue(class work_queue*);
+
   private:
     connection_engine(const connection_engine&);
     connection_engine& operator=(const connection_engine&);
@@ -180,6 +183,7 @@ PN_CPP_CLASS_EXTERN connection_engine {
     proton::transport transport_;
     proton::internal::pn_ptr<pn_collector_t> collector_;
 };
+
 }}
 
 #endif // CONNECTION_ENGINE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/io/default_controller.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/default_controller.hpp b/proton-c/bindings/cpp/include/proton/io/default_controller.hpp
new file mode 100644
index 0000000..f876d5f
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/default_controller.hpp
@@ -0,0 +1,47 @@
+#ifndef PROTON_IO_DRIVER_HPP
+#define PROTON_IO_DRIVER_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <functional>
+#include <memory>
+
+namespace proton {
+
+class controller;
+
+namespace io {
+
+/// A proton::controller implementation can create a static instance of default_controller
+/// to register as the default implementation.
+/// If more than one implementation is linked, which one becomes the default
+/// is undefined.
+struct default_controller {
+
+    /// A controller make-function takes a string container-id and returns a unique_ptr<controller>
+    typedef std::function<std::unique_ptr<controller>()> make_fn;
+
+    /// Construct a static instance of default_controller to register your controller factory.
+    PN_CPP_EXTERN default_controller(make_fn);
+};
+
+}}
+
+#endif // PROTON_IO_CONTROLLER_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/io/socket.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/socket.hpp b/proton-c/bindings/cpp/include/proton/io/socket.hpp
deleted file mode 100644
index bcbcecf..0000000
--- a/proton-c/bindings/cpp/include/proton/io/socket.hpp
+++ /dev/null
@@ -1,130 +0,0 @@
-#ifndef PROTON_IO_IO_HPP
-#define PROTON_IO_IO_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/io/connection_engine.hpp>
-#include <proton/url.hpp>
-
-
-namespace proton {
-namespace io {
-namespace socket {
-
-struct
-PN_CPP_CLASS_EXTERN io_error : public proton::error {
-    PN_CPP_EXTERN explicit io_error(const std::string&); ///< Construct with message
-};
-
-/// @name Setup and teardown
-///
-/// Call initialize() before using any functions in the proton::io::socket
-/// namespace.  Call finalize() when you are done.
-///
-/// You can call initialize/finalize more than once as long as they are in
-/// matching pairs. Use \ref guard to call initialize/finalize around a scope.
-///
-/// Note that on POSIX systems these are no-ops, but they are required
-/// for Windows.
-///
-/// @{
-
-/// Initialize the proton::io subsystem.
-PN_CPP_EXTERN void initialize();
-
-/// Finalize the proton::io subsystem.
-PN_CPP_EXTERN void finalize(); // nothrow
-
-/// Use to call io::initialize and io::finalize around a scope.
-struct guard {
-    guard() { initialize(); }
-    ~guard() { finalize(); }
-};
-
-/// @}
-
-/// An IO resource.
-typedef int64_t descriptor;
-
-/// An invalid descriptor.
-PN_CPP_EXTERN extern const descriptor INVALID_DESCRIPTOR;
-
-/// Return a string describing the most recent IO error.
-PN_CPP_EXTERN std::string error_str();
-
-/// Open a TCP connection to the host:port (port can be a service name or number) from a proton::url.
-PN_CPP_EXTERN descriptor connect(const proton::url&);
-
-/// Listening socket.
-class listener {
-  public:
-    /// Listen on host/port. Empty host means listen on all interfaces.
-    /// port can be a service name or number
-    PN_CPP_EXTERN listener(const std::string& host, const std::string& port);
-    PN_CPP_EXTERN ~listener();
-
-    /// Accept a connection. Return the descriptor, set host, port to the remote address.
-    /// port can be a service name or number.
-    PN_CPP_EXTERN descriptor accept(std::string& host, std::string& port);
-
-    /// Accept a connection, does not provide address info.
-    descriptor accept() { std::string dummy; return accept(dummy, dummy); }
-
-    /// Convert to descriptor
-    descriptor socket() const { return socket_; }
-
-  private:
-    guard guard_;
-    listener(const listener&);
-    listener& operator=(const listener&);
-    descriptor socket_;
-};
-
-/// A \ref connection_engine with non-blocking socket IO.
-class engine : public connection_engine {
-  public:
-    /// Wrap an open socket. Does not automatically open the connection.
-    PN_CPP_EXTERN engine(descriptor socket_, handler&, const connection_options& = connection_options());
-
-    /// Create socket engine connected to url, open the connection as a client.
-    PN_CPP_EXTERN engine(const url&, handler&, const connection_options& = connection_options());
-
-    PN_CPP_EXTERN ~engine();
-
-    /// Run the engine until it closes
-    PN_CPP_EXTERN void run();
-
-    /// Non-blocking read from socket to engine
-    PN_CPP_EXTERN void read();
-
-    /// Non-blocking write from engine to socket
-    PN_CPP_EXTERN void write();
-
-    descriptor socket() const { return socket_; }
-
-  private:
-    void init();
-    guard guard_;
-    descriptor socket_;
-};
-
-}}}
-
-#endif  /*!PROTON_IO_IO_HPP*/


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org