You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/08/28 21:57:31 UTC

[1/3] qpid-proton git commit: PROTON-1554: Remove thread-safe template, simplify returned<>

Repository: qpid-proton
Updated Branches:
  refs/heads/master ed756d8f3 -> 298e7dba1


PROTON-1554: Remove thread-safe template, simplify returned<>

Removed the thread-safe template.

Simplified and documented returned<>: It can *only* be converted to proton
object, and only in a single-threaded application.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f1ee2681
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f1ee2681
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f1ee2681

Branch: refs/heads/master
Commit: f1ee268163ce80c2b9dc3dcb7ec00151f5c6b486
Parents: ed756d8
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Aug 24 16:49:08 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Sat Aug 26 13:16:50 2017 -0400

----------------------------------------------------------------------
 examples/cpp/broker.cpp                         |   2 +-
 examples/cpp/client.cpp                         |   1 -
 examples/cpp/connection_options.cpp             |   1 -
 examples/cpp/flow_control.cpp                   |   1 -
 examples/cpp/helloworld.cpp                     |   1 -
 examples/cpp/helloworld_direct.cpp              |   1 -
 examples/cpp/queue_browser.cpp                  |   1 -
 examples/cpp/scheduled_send.cpp                 |   2 +-
 examples/cpp/scheduled_send_03.cpp              |   1 -
 examples/cpp/selected_recv.cpp                  |   1 -
 examples/cpp/server.cpp                         |   1 -
 examples/cpp/service_bus.cpp                    |   2 +-
 examples/cpp/simple_recv.cpp                    |   1 -
 examples/cpp/simple_send.cpp                    |   1 -
 examples/cpp/ssl.cpp                            |   1 -
 examples/cpp/ssl_client_cert.cpp                |   1 -
 proton-c/bindings/cpp/CMakeLists.txt            |   2 +-
 proton-c/bindings/cpp/docs/headers.dox          |   2 -
 .../bindings/cpp/include/proton/connection.hpp  |   5 +-
 .../bindings/cpp/include/proton/container.hpp   |  28 +++-
 .../bindings/cpp/include/proton/delivery.hpp    |   2 +-
 proton-c/bindings/cpp/include/proton/fwd.hpp    |   2 -
 .../cpp/include/proton/internal/object.hpp      |   5 +-
 .../cpp/include/proton/io/connection_driver.hpp |   1 -
 .../bindings/cpp/include/proton/receiver.hpp    |   3 +-
 .../bindings/cpp/include/proton/returned.hpp    |  62 +++++++
 proton-c/bindings/cpp/include/proton/sender.hpp |   1 -
 .../bindings/cpp/include/proton/session.hpp     |   1 -
 .../bindings/cpp/include/proton/thread_safe.hpp | 165 -------------------
 .../bindings/cpp/include/proton/work_queue.hpp  |   6 +-
 .../bindings/cpp/src/connection_driver_test.cpp |   2 +-
 proton-c/bindings/cpp/src/container.cpp         |   1 -
 proton-c/bindings/cpp/src/container_test.cpp    |   1 -
 .../bindings/cpp/src/include/proton_bits.hpp    |   6 +
 .../cpp/src/proactor_container_impl.cpp         |   9 +-
 proton-c/bindings/cpp/src/returned.cpp          |  41 +++++
 proton-c/bindings/cpp/src/thread_safe_test.cpp  | 108 ------------
 tests/tools/apps/cpp/reactor_send.cpp           |   1 -
 38 files changed, 155 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp
index 198b449..a236bb1 100644
--- a/examples/cpp/broker.cpp
+++ b/examples/cpp/broker.cpp
@@ -35,9 +35,9 @@
 #include <proton/source_options.hpp>
 #include <proton/target.hpp>
 #include <proton/target_options.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 #include <proton/transport.hpp>
+#include <proton/work_queue.hpp>
 
 #include <deque>
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp
index 7139155..81a9a32 100644
--- a/examples/cpp/client.cpp
+++ b/examples/cpp/client.cpp
@@ -28,7 +28,6 @@
 #include <proton/messaging_handler.hpp>
 #include <proton/receiver_options.hpp>
 #include <proton/source_options.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/connection_options.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/connection_options.cpp b/examples/cpp/connection_options.cpp
index f718060..a696c6d 100644
--- a/examples/cpp/connection_options.cpp
+++ b/examples/cpp/connection_options.cpp
@@ -24,7 +24,6 @@
 #include <proton/container.hpp>
 #include <proton/default_container.hpp>
 #include <proton/messaging_handler.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/transport.hpp>
 
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/flow_control.cpp b/examples/cpp/flow_control.cpp
index c0b8739..7b1474e 100644
--- a/examples/cpp/flow_control.cpp
+++ b/examples/cpp/flow_control.cpp
@@ -31,7 +31,6 @@
 #include <proton/messaging_handler.hpp>
 #include <proton/receiver_options.hpp>
 #include <proton/sender.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/helloworld.cpp b/examples/cpp/helloworld.cpp
index 4aa5cdd..404d822 100644
--- a/examples/cpp/helloworld.cpp
+++ b/examples/cpp/helloworld.cpp
@@ -25,7 +25,6 @@
 #include <proton/delivery.hpp>
 #include <proton/message.hpp>
 #include <proton/messaging_handler.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 #include <proton/url.hpp>
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/helloworld_direct.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/helloworld_direct.cpp b/examples/cpp/helloworld_direct.cpp
index 9331587..f879edd 100644
--- a/examples/cpp/helloworld_direct.cpp
+++ b/examples/cpp/helloworld_direct.cpp
@@ -26,7 +26,6 @@
 #include <proton/message.hpp>
 #include <proton/messaging_handler.hpp>
 #include <proton/sender.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/queue_browser.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/queue_browser.cpp b/examples/cpp/queue_browser.cpp
index 583277e..ef158b5 100644
--- a/examples/cpp/queue_browser.cpp
+++ b/examples/cpp/queue_browser.cpp
@@ -27,7 +27,6 @@
 #include <proton/messaging_handler.hpp>
 #include <proton/receiver_options.hpp>
 #include <proton/source_options.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/url.hpp>
 
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/scheduled_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/scheduled_send.cpp b/examples/cpp/scheduled_send.cpp
index 2914c44..4c71482 100644
--- a/examples/cpp/scheduled_send.cpp
+++ b/examples/cpp/scheduled_send.cpp
@@ -22,11 +22,11 @@
 #include "options.hpp"
 
 #include <proton/container.hpp>
+#include <proton/connection.hpp>
 #include <proton/default_container.hpp>
 #include <proton/message.hpp>
 #include <proton/messaging_handler.hpp>
 #include <proton/sender.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 #include <proton/work_queue.hpp>
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/scheduled_send_03.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/scheduled_send_03.cpp b/examples/cpp/scheduled_send_03.cpp
index 008853c..20972e4 100644
--- a/examples/cpp/scheduled_send_03.cpp
+++ b/examples/cpp/scheduled_send_03.cpp
@@ -29,7 +29,6 @@
 #include <proton/message.hpp>
 #include <proton/messaging_handler.hpp>
 #include <proton/sender.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 #include <proton/work_queue.hpp>
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/selected_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/selected_recv.cpp b/examples/cpp/selected_recv.cpp
index a48ef0e..771fb29 100644
--- a/examples/cpp/selected_recv.cpp
+++ b/examples/cpp/selected_recv.cpp
@@ -26,7 +26,6 @@
 #include <proton/messaging_handler.hpp>
 #include <proton/receiver_options.hpp>
 #include <proton/source_options.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/url.hpp>
 
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/server.cpp b/examples/cpp/server.cpp
index 449ce6e..573b3a0 100644
--- a/examples/cpp/server.cpp
+++ b/examples/cpp/server.cpp
@@ -27,7 +27,6 @@
 #include <proton/message.hpp>
 #include <proton/message_id.hpp>
 #include <proton/messaging_handler.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 #include <proton/url.hpp>
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/service_bus.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/service_bus.cpp b/examples/cpp/service_bus.cpp
index 6b57f8d..2c7a682 100644
--- a/examples/cpp/service_bus.cpp
+++ b/examples/cpp/service_bus.cpp
@@ -94,9 +94,9 @@ Done. No more messages.
 #include <proton/sender.hpp>
 #include <proton/sender_options.hpp>
 #include <proton/source_options.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 #include <proton/url.hpp>
+#include <proton/work_queue.hpp>
 
 #include <iostream>
 #include <sstream>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_recv.cpp b/examples/cpp/simple_recv.cpp
index 145eef9..93b4868 100644
--- a/examples/cpp/simple_recv.cpp
+++ b/examples/cpp/simple_recv.cpp
@@ -30,7 +30,6 @@
 #include <proton/message.hpp>
 #include <proton/message_id.hpp>
 #include <proton/messaging_handler.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/value.hpp>
 
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_send.cpp b/examples/cpp/simple_send.cpp
index 358bbec..ebc02cb 100644
--- a/examples/cpp/simple_send.cpp
+++ b/examples/cpp/simple_send.cpp
@@ -28,7 +28,6 @@
 #include <proton/message.hpp>
 #include <proton/message_id.hpp>
 #include <proton/messaging_handler.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 #include <proton/types.hpp>
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/ssl.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/ssl.cpp b/examples/cpp/ssl.cpp
index 166bd61..e24961f 100644
--- a/examples/cpp/ssl.cpp
+++ b/examples/cpp/ssl.cpp
@@ -30,7 +30,6 @@
 #include <proton/message.hpp>
 #include <proton/messaging_handler.hpp>
 #include <proton/ssl.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 #include <proton/transport.hpp>
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/ssl_client_cert.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/ssl_client_cert.cpp b/examples/cpp/ssl_client_cert.cpp
index 8ca2dc2..c6c7666 100644
--- a/examples/cpp/ssl_client_cert.cpp
+++ b/examples/cpp/ssl_client_cert.cpp
@@ -28,7 +28,6 @@
 #include <proton/messaging_handler.hpp>
 #include <proton/sasl.hpp>
 #include <proton/ssl.hpp>
-#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
 #include <proton/transport.hpp>
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 21ff26c..472105a 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -57,6 +57,7 @@ set(qpid-proton-cpp-source
   src/receiver.cpp
   src/receiver_options.cpp
   src/reconnect_timer.cpp
+  src/returned.cpp
   src/sasl.cpp
   src/scalar_base.cpp
   src/sender.cpp
@@ -169,7 +170,6 @@ endmacro(add_cpp_test)
 
 add_cpp_test(codec_test)
 add_cpp_test(connection_driver_test)
-add_cpp_test(thread_safe_test)
 add_cpp_test(interop_test ${CMAKE_SOURCE_DIR}/tests)
 add_cpp_test(message_test)
 add_cpp_test(map_test)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/docs/headers.dox
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/headers.dox b/proton-c/bindings/cpp/docs/headers.dox
index 0ff7220..7e9d79c 100644
--- a/proton-c/bindings/cpp/docs/headers.dox
+++ b/proton-c/bindings/cpp/docs/headers.dox
@@ -14,7 +14,6 @@
 /// @file proton/duration.hpp Time duration data type
 /// @file proton/error_condition.hpp AMQP error condition
 /// @file proton/error.hpp Base exception type thrown by proton functions
-/// @file proton/event_loop.hpp
 /// @file proton/function.hpp
 /// @file proton/fwd.hpp
 /// @file proton/link.hpp
@@ -42,7 +41,6 @@
 /// @file proton/target.hpp
 /// @file proton/target_options.hpp
 /// @file proton/terminus.hpp
-/// @file proton/thread_safe.hpp
 /// @file proton/timestamp.hpp
 /// @file proton/tracker.hpp
 /// @file proton/transfer.hpp

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index ef75a4e..10ea61b 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -109,10 +109,10 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
     PN_CPP_EXTERN receiver open_receiver(const std::string &addr,
                                          const receiver_options &);
 
-    /// @copydoc container::sender_options
+    /// @see proton::container::sender_options()
     PN_CPP_EXTERN class sender_options sender_options() const;
 
-    /// @copydoc container::receiver_options
+    /// @see container::receiver_options()
     PN_CPP_EXTERN class receiver_options receiver_options() const;
 
     /// Return all sessions on this connection.
@@ -142,7 +142,6 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
     /// @cond INTERNAL
   friend class internal::factory<connection>;
   friend class container;
-  friend class proton::thread_safe<connection>;
     /// @endcond
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 859d70c..64e52ad 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -23,7 +23,7 @@
  */
 
 #include "./fwd.hpp"
-#include "./thread_safe.hpp"
+#include "./returned.hpp"
 #include "./types_fwd.hpp"
 
 #include "./internal/config.hpp"
@@ -76,9 +76,14 @@ class PN_CPP_CLASS_EXTERN container {
     /// The handler in the composed options is used to call
     /// proton::messaging_handler::on_connection_open() when the remote peer's
     /// open response is received.
+    ///
+    /// @return A returned<connection>
+    /// @copydetails returned
     PN_CPP_EXTERN returned<connection> connect(const std::string& url, const connection_options &);
 
     /// Connect to `url` and send an open request to the remote peer.
+    /// @return A returned<connection>
+    /// @copydetails returned
     PN_CPP_EXTERN returned<connection> connect(const std::string& url);
 
     /// Start listening on url.
@@ -135,12 +140,16 @@ class PN_CPP_CLASS_EXTERN container {
     PN_CPP_EXTERN void stop();
 
     /// Open a connection and sender for `url`.
+    /// @return A returned<sender>
+    /// @copydetails returned
     PN_CPP_EXTERN returned<sender> open_sender(const std::string &url);
 
     /// Open a connection and sender for `url`.
     ///
     /// Supplied sender options will override the container's
     /// template options.
+    /// @return A returned<sender>
+    /// @copydetails returned
     PN_CPP_EXTERN returned<sender> open_sender(const std::string &url,
                                          const proton::sender_options &o);
 
@@ -148,6 +157,8 @@ class PN_CPP_CLASS_EXTERN container {
     ///
     /// Supplied connection options will override the
     /// container's template options.
+    /// @return A returned<sender>
+    /// @copydetails returned
     PN_CPP_EXTERN returned<sender> open_sender(const std::string &url,
                                          const connection_options &c);
 
@@ -155,11 +166,17 @@ class PN_CPP_CLASS_EXTERN container {
     ///
     /// Supplied sender or connection options will override the
     /// container's template options.
+    ///
+    /// @return A returned<sender>
+    /// @copydetails returned
     PN_CPP_EXTERN returned<sender> open_sender(const std::string &url,
                                          const proton::sender_options &o,
                                          const connection_options &c);
 
     /// Open a connection and receiver for `url`.
+    ///
+    /// @return A returned<receiver>
+    /// @copydetails returned
     PN_CPP_EXTERN returned<receiver> open_receiver(const std::string&url);
 
 
@@ -167,6 +184,9 @@ class PN_CPP_CLASS_EXTERN container {
     ///
     /// Supplied receiver options will override the container's
     /// template options.
+    ///
+    /// @return A returned<receiver>
+    /// @copydetails returned
     PN_CPP_EXTERN returned<receiver> open_receiver(const std::string&url,
                                              const proton::receiver_options &o);
 
@@ -174,6 +194,9 @@ class PN_CPP_CLASS_EXTERN container {
     ///
     /// Supplied receiver or connection options will override the
     /// container's template options.
+    ///
+    /// @return A returned<receiver>
+    /// @copydetails returned
     PN_CPP_EXTERN returned<receiver> open_receiver(const std::string&url,
                                              const connection_options &c);
 
@@ -181,6 +204,9 @@ class PN_CPP_CLASS_EXTERN container {
     ///
     /// Supplied receiver or connection options will override the
     /// container's template options.
+    ///
+    /// @return A returned<receiver>
+    /// @copydetails returned
     PN_CPP_EXTERN returned<receiver> open_receiver(const std::string&url,
                                              const proton::receiver_options &o,
                                              const connection_options &c);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/delivery.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/delivery.hpp b/proton-c/bindings/cpp/include/proton/delivery.hpp
index 7c89f0c..7a38bca 100644
--- a/proton-c/bindings/cpp/include/proton/delivery.hpp
+++ b/proton-c/bindings/cpp/include/proton/delivery.hpp
@@ -46,7 +46,7 @@ class delivery : public transfer {
 
     // XXX ATM the following don't reflect the differing behaviors we
     // get from the different delivery modes. - Deferred
-    
+
     /// Settle with ACCEPTED state.
     PN_CPP_EXTERN void accept();
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/fwd.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/fwd.hpp b/proton-c/bindings/cpp/include/proton/fwd.hpp
index 5ade5fd..efbb91b 100644
--- a/proton-c/bindings/cpp/include/proton/fwd.hpp
+++ b/proton-c/bindings/cpp/include/proton/fwd.hpp
@@ -64,8 +64,6 @@ class connection_driver;
 }
 
 template <class T> class returned;
-template <class T> class thread_safe;
-
 }
 
 #endif // PROTON_FWD_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/internal/object.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/object.hpp b/proton-c/bindings/cpp/include/proton/internal/object.hpp
index d492b80..442b09d 100644
--- a/proton-c/bindings/cpp/include/proton/internal/object.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/object.hpp
@@ -31,7 +31,7 @@
 
 namespace proton {
 
-template <class T> class thread_safe;
+template <class T> class returned;
 
 namespace internal {
 
@@ -101,7 +101,8 @@ template <class T> class object : private comparable<object<T> > {
     friend bool operator==(const object& a, const object& b) { return a.object_ == b.object_; }
     friend bool operator<(const object& a, const object& b) { return a.object_ < b.object_; }
     friend std::ostream& operator<<(std::ostream& o, const object& a) { o << a.object_.inspect(); return o; }
-    template <class U> friend class proton::thread_safe;
+
+  template <class U> friend class proton::returned;
 };
 
 /// Factory class used internally to make wrappers and extract proton objects

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
index 44275bc..f9774ac 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -179,7 +179,6 @@ PN_CPP_CLASS_EXTERN connection_driver {
     PN_CPP_EXTERN bool dispatch();
 
     /// Get the AMQP connection associated with this connection_driver.
-    /// The event_loop is availabe via proton::thread_safe<connection>(connection())
     PN_CPP_EXTERN proton::connection connection() const;
 
     /// Get the transport associated with this connection_driver.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/receiver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/receiver.hpp b/proton-c/bindings/cpp/include/proton/receiver.hpp
index f92ac96..b995e6f 100644
--- a/proton-c/bindings/cpp/include/proton/receiver.hpp
+++ b/proton-c/bindings/cpp/include/proton/receiver.hpp
@@ -76,12 +76,11 @@ PN_CPP_CLASS_EXTERN receiver : public link {
     /// @cond INTERNAL
   friend class internal::factory<receiver>;
   friend class receiver_iterator;
-  friend class thread_safe<receiver>;
     /// @endcond
 };
 
 /// @cond INTERNAL
-    
+
 /// An iterator of receivers.
 class receiver_iterator : public internal::iter_base<receiver, receiver_iterator> {
     explicit receiver_iterator(receiver r, pn_session_t* s = 0) :

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/returned.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/returned.hpp b/proton-c/bindings/cpp/include/proton/returned.hpp
new file mode 100644
index 0000000..25b5c91
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/returned.hpp
@@ -0,0 +1,62 @@
+#ifndef PROTON_RETURNED_HPP
+#define PROTON_RETURNED_HPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "./internal/object.hpp"
+#include "./connection.hpp"
+#include "./receiver.hpp"
+#include "./sender.hpp"
+
+/// @file
+/// Return type for container functions
+
+namespace proton {
+
+namespace internal {
+template <class T> class factory;
+}
+
+/// Return type for container functions
+///
+/// @note returned value is *thread-unsafe*.
+/// A single-threaded application can assign the returned<T> value to a plain T.
+/// A multi-threaded application *must* ignore the returned value, as it may already
+/// be invalid by the time the function returns. Multi-threaded applications
+/// can access the value in @ref messaging_handler functions.
+///
+template <class T>
+class returned
+{
+  public:
+    operator T() const;
+
+  private:
+    typename T::pn_type* ptr_;
+    returned(const T&);
+    returned& operator=(const returned&);
+    template <class U> friend class internal::factory;
+};
+
+} // proton
+
+#endif  /*!PROTON_RETURNED_HPP*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/sender.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp
index f8c1e66..b01f21c 100644
--- a/proton-c/bindings/cpp/include/proton/sender.hpp
+++ b/proton-c/bindings/cpp/include/proton/sender.hpp
@@ -71,7 +71,6 @@ PN_CPP_CLASS_EXTERN sender : public link {
     /// @cond INTERNAL
   friend class internal::factory<sender>;
   friend class sender_iterator;
-  friend class thread_safe<sender>;
     /// @endcond
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/session.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/session.hpp b/proton-c/bindings/cpp/include/proton/session.hpp
index 8d4184b..c66aa05 100644
--- a/proton-c/bindings/cpp/include/proton/session.hpp
+++ b/proton-c/bindings/cpp/include/proton/session.hpp
@@ -99,7 +99,6 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp
     /// @cond INTERNAL
   friend class internal::factory<session>;
   friend class session_iterator;
-  friend class thread_safe<session>;
     /// @endcond
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/thread_safe.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/thread_safe.hpp b/proton-c/bindings/cpp/include/proton/thread_safe.hpp
deleted file mode 100644
index 0b38883..0000000
--- a/proton-c/bindings/cpp/include/proton/thread_safe.hpp
+++ /dev/null
@@ -1,165 +0,0 @@
-#ifndef PROTON_THREAD_SAFE_HPP
-#define PROTON_THREAD_SAFE_HPP
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "./fwd.hpp"
-#include "./internal/config.hpp"
-#include "./connection.hpp"
-#include "./function.hpp"
-#include "./internal/object.hpp"
-#include "./internal/type_traits.hpp"
-#include "./work_queue.hpp"
-
-#include <functional>
-
-namespace proton {
-
-namespace internal {
-template <class T> struct endpoint_traits;
-template<> struct endpoint_traits<connection> {};
-template<> struct endpoint_traits<session> {};
-template<> struct endpoint_traits<link> {};
-template<> struct endpoint_traits<sender> {};
-template<> struct endpoint_traits<receiver> {};
-}
-
-/// **Experimental** - A thread-safe object wrapper.
-///
-/// The proton::object subclasses (proton::connection, proton::sender etc.) are
-/// reference-counted wrappers for C structs. They are not safe for concurrent use,
-/// not even to copy or assign.
-///
-/// A pointer to thread_safe<> can be used from any thread to get the
-/// proton::event_loop for the object's connection. The object will not be
-/// destroyed until the thread_safe<> is deleted. You can use std::shared_ptr,
-/// std::unique_ptr or any other memory management technique to manage the
-/// thread_safe<>.
-///
-/// Use make_thread_safe(), make_shared_thread_safe(), make_unique_thread_safe() to
-/// create a thread_safe<>
-///
-/// @see @ref mt_page
-template <class T>
-class thread_safe : private internal::pn_ptr_base, private internal::endpoint_traits<T> {
-    typedef typename T::pn_type pn_type;
-
-  public:
-    /// @cond INTERNAL
-    static void operator delete(void*) {}
-    /// @endcond
-
-    ~thread_safe() {
-        if (ptr()) {
-            if (!!work_queue().impl_) schedule_work(&work_queue(), &decref, (void*)ptr());
-            else decref(ptr());
-        }
-    }
-
-    /// Get the work queue for this object.
-    class work_queue& work_queue() { return work_queue::get(ptr()); }
-
-    /// Get the thread-unsafe proton object wrapped by this thread_safe<T>
-    T unsafe() { return T(ptr()); }
-
-  private:
-    static thread_safe* create(const T& obj) { return new (obj.pn_object()) thread_safe(); }
-    static void* operator new(size_t, pn_type* p) { return p; }
-    static void operator delete(void*, pn_type*) {}
-    thread_safe() { incref(ptr()); }
-    pn_type* ptr() { return reinterpret_cast<pn_type*>(this); }
-
-
-    // Non-copyable.
-    thread_safe(const thread_safe&);
-    thread_safe& operator=(const thread_safe&);
-
-    /// @cond INTERNAL
-  friend class returned<T>;
-    /// @endcond
-};
-
-// A return value for functions returning a thread_safe<> object.
-//
-// Temporary return value only, you should release() to get a plain pointer or
-// assign to a smart pointer type.
-template <class T>
-class returned : private internal::endpoint_traits<T>
-{
-  public:
-    /// Take ownership
-    explicit returned(thread_safe<T>* p) : ptr_(p) {}
-    /// Create an owned thread_safe<T>
-    explicit returned(const T& obj) : ptr_(thread_safe<T>::create(obj)) {}
-    /// Transfer ownership.
-    /// Use the same "cheat" as std::auto_ptr, calls x.release() even though x is const.
-    returned(const returned& x) : ptr_(const_cast<returned&>(x).release()) {}
-    /// Delete if still owned.
-    ~returned() { if (ptr_) delete ptr_; }
-
-    /// Release ownership.
-    thread_safe<T>* release() const { thread_safe<T>* p = ptr_; ptr_ = 0; return p; }
-
-    /// Get the raw pointer, caller must not delete.
-    thread_safe<T>* get() const { return ptr_; }
-
-    /// Implicit conversion to target, usable only in a safe context.
-    operator T() { return ptr_->unsafe(); }
-
-#if PN_CPP_HAS_SHARED_PTR
-    /// Release to a std::shared_ptr
-    operator std::shared_ptr<thread_safe<T> >() {
-        return std::shared_ptr<thread_safe<T> >(release());
-    }
-#endif
-#if PN_CPP_HAS_UNIQUE_PTR
-    /// Release to a std::unique_ptr
-    operator std::unique_ptr<thread_safe<T> >() {
-        return std::unique_ptr<thread_safe<T> >(release());
-    }
-#endif
-
-  private:
-    void operator=(const returned&);
-    mutable thread_safe<T>* ptr_;
-};
-
-/// Make a thread-safe wrapper for `obj`.
-template <class T> returned<T> make_thread_safe(const T& obj) { return returned<T>(obj); }
-
-#if PN_CPP_HAS_SHARED_PTR
-/// Create a thread-safe shared_ptr to `obj`.
-template <class T> std::shared_ptr<thread_safe<T> > make_shared_thread_safe(const T& obj) {
-    return make_thread_safe(obj);
-}
-#endif
-#if PN_CPP_HAS_UNIQUE_PTR
-/// Create a thread-safe unique_ptr to `obj`.
-template <class T> std::unique_ptr<thread_safe<T> > make_unique_thread_safe(const T& obj) {
-    return make_thread_safe(obj);
-}
-
-#endif
-
-} // proton
-
-#endif // PROTON_THREAD_SAFE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/work_queue.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp b/proton-c/bindings/cpp/include/proton/work_queue.hpp
index 844680b..30d8395 100644
--- a/proton-c/bindings/cpp/include/proton/work_queue.hpp
+++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp
@@ -40,10 +40,10 @@ namespace proton {
 /// **Experimental** - A work queue for serial execution.
 ///
 /// Event handler functions associated with a single proton::connection are called in sequence.
-/// The connection's @ref work_queue allows you to "inject" extra @ref work from any thread,
+/// The connection's proton::work_queue allows you to "inject" extra @ref work from any thread,
 /// and have it executed in the same sequence.
 ///
-/// You may also create arbitrary @ref work_queue objects backed by a @ref container that allow
+/// You may also create arbitrary proton::work_queue objects backed by a @ref container that allow
 /// other objects to have their own serialised work queues that can have work injected safely
 /// from other threads. The @ref container ensures that the work is correctly serialised.
 ///
@@ -113,7 +113,6 @@ class PN_CPP_CLASS_EXTERN work_queue {
     /// @cond INTERNAL
   friend class container;
   friend class io::connection_driver;
-  template <class T> friend class thread_safe;
     /// @endcond
 };
 
@@ -347,6 +346,7 @@ void schedule_work(WQ wq, duration dn, F f, A a, B b, C c, D d) {
 #else
 // The C++11 version is *much* simpler and even so more general!
 // These definitions encompass everything in the C++03 section
+
 template <class WQ, class... Rest>
 bool schedule_work(WQ wq, Rest&&... r) {
     return wq->add(std::bind(std::forward<Rest>(r)...));

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/connection_driver_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_driver_test.cpp b/proton-c/bindings/cpp/src/connection_driver_test.cpp
index 7fcde46..d174454 100644
--- a/proton-c/bindings/cpp/src/connection_driver_test.cpp
+++ b/proton-c/bindings/cpp/src/connection_driver_test.cpp
@@ -22,6 +22,7 @@
 #include "proton_bits.hpp"
 
 #include "proton/container.hpp"
+#include "proton/connection.hpp"
 #include "proton/io/connection_driver.hpp"
 #include "proton/io/link_namer.hpp"
 #include "proton/link.hpp"
@@ -31,7 +32,6 @@
 #include "proton/sender.hpp"
 #include "proton/sender_options.hpp"
 #include "proton/source_options.hpp"
-#include "proton/thread_safe.hpp"
 #include "proton/transport.hpp"
 #include "proton/types_fwd.hpp"
 #include "proton/uuid.hpp"

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp
index 35f645c..c82e1a8 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -25,7 +25,6 @@
 #include "proton/error_condition.hpp"
 #include "proton/listen_handler.hpp"
 #include "proton/listener.hpp"
-#include "proton/thread_safe.hpp"
 
 #include "proactor_container_impl.hpp"
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/container_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_test.cpp b/proton-c/bindings/cpp/src/container_test.cpp
index d210268..498b217 100644
--- a/proton-c/bindings/cpp/src/container_test.cpp
+++ b/proton-c/bindings/cpp/src/container_test.cpp
@@ -26,7 +26,6 @@
 #include "proton/messaging_handler.hpp"
 #include "proton/listener.hpp"
 #include "proton/listen_handler.hpp"
-#include "proton/thread_safe.hpp"
 
 #include <cstdlib>
 #include <ctime>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/include/proton_bits.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proton_bits.hpp b/proton-c/bindings/cpp/src/include/proton_bits.hpp
index fdd79b5..035ffb7 100644
--- a/proton-c/bindings/cpp/src/include/proton_bits.hpp
+++ b/proton-c/bindings/cpp/src/include/proton_bits.hpp
@@ -124,6 +124,7 @@ class factory {
 public:
     static T wrap(typename wrapped<T>::type* t) { return t; }
     static typename wrapped<T>::type* unwrap(const T& t) { return t.pn_object(); }
+    static returned<T> make_returned(const T& t) { return returned<T>(t); }
 };
 
 template <class T> struct context {};
@@ -150,6 +151,11 @@ U make_wrapper(typename internal::wrapped<U>::type* t) { return internal::factor
 template <class T>
 typename internal::wrapped<T>::type* unwrap(const T& t) { return internal::factory<T>::unwrap(t); }
 
+template <class T>
+returned<T> make_returned(const T& t) {
+    return internal::factory<T>::make_returned(t);
+}
+
 }
 
 #endif // PROTON_BITS_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index b900d6f..1389306 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -24,7 +24,6 @@
 #include "proton/function.hpp"
 #include "proton/listener.hpp"
 #include "proton/listen_handler.hpp"
-#include "proton/thread_safe.hpp"
 #include "proton/url.hpp"
 
 #include "proton/connection.h"
@@ -188,13 +187,13 @@ proton::connection container::impl::connect_common(
     return conn;
 }
 
-proton::returned<proton::connection> container::impl::connect(
+returned<proton::connection> container::impl::connect(
     const std::string& addr,
     const proton::connection_options& user_opts)
 {
     connection conn = connect_common(addr, user_opts);
     GUARD(lock_);
-    return make_thread_safe(conn);
+    return make_returned(conn);
 }
 
 returned<sender> container::impl::open_sender(const std::string &url, const proton::sender_options &o1, const connection_options &o2) {
@@ -203,7 +202,7 @@ returned<sender> container::impl::open_sender(const std::string &url, const prot
     connection conn = connect_common(url, o2);
 
     GUARD(lock_);
-    return make_thread_safe(conn.default_session().open_sender(proton::url(url).path(), lopts));
+    return make_returned(conn.default_session().open_sender(proton::url(url).path(), lopts));
 }
 
 returned<receiver> container::impl::open_receiver(const std::string &url, const proton::receiver_options &o1, const connection_options &o2) {
@@ -212,7 +211,7 @@ returned<receiver> container::impl::open_receiver(const std::string &url, const
     connection conn = connect_common(url, o2);
 
     GUARD(lock_);
-    return make_thread_safe(
+    return make_returned(
         conn.default_session().open_receiver(proton::url(url).path(), lopts));
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/returned.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/returned.cpp b/proton-c/bindings/cpp/src/returned.cpp
new file mode 100644
index 0000000..2e1a4b2
--- /dev/null
+++ b/proton-c/bindings/cpp/src/returned.cpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton_bits.hpp"
+
+#include <proton/returned.hpp>
+#include <proton/connection.hpp>
+#include <proton/sender.hpp>
+#include <proton/receiver.hpp>
+
+namespace proton {
+
+template <class T> returned<T>::returned(const T& t) : ptr_(unwrap(t)) {}
+
+template <class T> returned<T>::operator T() const {
+    return internal::factory<T>::wrap(ptr_);
+}
+
+// Explicit instantiations for allowed types
+
+template class PN_CPP_CLASS_EXTERN returned<connection>;
+template class PN_CPP_CLASS_EXTERN returned<sender>;
+template class PN_CPP_CLASS_EXTERN returned<receiver>;
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/thread_safe_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/thread_safe_test.cpp b/proton-c/bindings/cpp/src/thread_safe_test.cpp
deleted file mode 100644
index 3a72f7f..0000000
--- a/proton-c/bindings/cpp/src/thread_safe_test.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/// Test reference counting for object wrappers, threads_safe<> wrappers and returned<> wrappers.
-///
-
-#include "test_bits.hpp"
-#include "proton_bits.hpp"
-
-#include "proton/thread_safe.hpp"
-#include "proton/io/connection_driver.hpp"
-
-#include <proton/connection.h>
-
-namespace {
-
-using namespace std;
-using namespace proton;
-
-void test_new() {
-    pn_connection_t* c = 0;
-    thread_safe<connection>* p = 0;
-    {
-        io::connection_driver e;
-        c = unwrap(e.connection());
-        int r = pn_refcount(c);
-        ASSERT(r >= 1); // engine may have internal refs (transport, collector).
-        p = make_thread_safe(e.connection()).release();
-        ASSERT_EQUAL(r+1, pn_refcount(c));
-        delete p;
-        ASSERT_EQUAL(r, pn_refcount(c));
-        p = make_thread_safe(e.connection()).release();
-    }
-    ASSERT_EQUAL(1, pn_refcount(c)); // Engine gone, thread_safe keeping c alive.
-    delete p;
-
-#if PN_CPP_HAS_SHARED_PTR
-    {
-        std::shared_ptr<thread_safe<connection> > sp;
-        {
-            io::connection_driver e;
-            c = unwrap(e.connection());
-            sp = make_shared_thread_safe(e.connection());
-        }
-        ASSERT_EQUAL(1, pn_refcount(c)); // Engine gone, sp keeping c alive.
-    }
-#endif
-#if PN_CPP_HAS_UNIQUE_PTR
-    {
-        std::unique_ptr<thread_safe<connection> > up;
-        {
-            io::connection_driver e;
-            c = unwrap(e.connection());
-            up = make_unique_thread_safe(e.connection());
-        }
-        ASSERT_EQUAL(1, pn_refcount(c)); // Engine gone, sp keeping c alive.
-    }
-#endif
-}
-
-void test_convert() {
-    // Verify refcounts as expected with conversion between proton::object
-    // and thread_safe.
-    connection c;
-    pn_connection_t* pc = 0;
-    {
-        io::connection_driver eng;
-        c = eng.connection();
-        pc = unwrap(c);         // Unwrap in separate scope to avoid confusion from temp values.
-    }
-    {
-        ASSERT_EQUAL(1, pn_refcount(pc));
-        returned<connection> pptr = make_thread_safe(c);
-        ASSERT_EQUAL(2, pn_refcount(pc));
-        returned<connection> pp2 = pptr;
-        ASSERT(!pptr.release()); // Transferred to pp2
-        ASSERT_EQUAL(2, pn_refcount(pc));
-        connection c2 = pp2;        // Transfer and convert to target
-        ASSERT_EQUAL(3, pn_refcount(pc)); // c, c2, thread_safe.
-        ASSERT(c == c2);
-    }
-    ASSERT_EQUAL(1, pn_refcount(pc)); // only c is left
-}
-
-}
-
-int main(int, char**) {
-    int failed = 0;
-    RUN_TEST(failed, test_new());
-    RUN_TEST(failed, test_convert());
-    return failed;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/tests/tools/apps/cpp/reactor_send.cpp
----------------------------------------------------------------------
diff --git a/tests/tools/apps/cpp/reactor_send.cpp b/tests/tools/apps/cpp/reactor_send.cpp
index 7841a5e..62ac4ce 100644
--- a/tests/tools/apps/cpp/reactor_send.cpp
+++ b/tests/tools/apps/cpp/reactor_send.cpp
@@ -32,7 +32,6 @@
 #include "proton/messaging_handler.hpp"
 #include "proton/receiver_options.hpp"
 #include "proton/sender.hpp"
-#include "proton/thread_safe.hpp"
 #include "proton/tracker.hpp"
 #include "proton/value.hpp"
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-proton git commit: PROTON-1557: c++ improve multi-threaded clients

Posted by ac...@apache.org.
PROTON-1557: c++ improve multi-threaded clients

2 clients:
- multithreaded_client.cpp: simple send thread, receive thread, run thread
- multithreaded_client_flow_control: multi-connection, block for flow control

Changes:
- reduced needless diff between examples
- use separate work_queue* to clarify separate thread safety rules from sender
- took work_queue->add() out of lock to emphasize it is thread safe
- use fixed argument list, same arg order


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/298e7dba
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/298e7dba
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/298e7dba

Branch: refs/heads/master
Commit: 298e7dba1b8c09781abca0a4a555b90116703a8a
Parents: 68c8cf4
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Aug 28 11:53:35 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Aug 28 17:54:35 2017 -0400

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |   3 +-
 examples/cpp/example_test.py                    |  12 +
 examples/cpp/mt_queue.hpp                       | 102 -------
 examples/cpp/multithreaded_client.cpp           | 185 ++++++++++++
 .../cpp/multithreaded_client_flow_control.cpp   | 287 +++++++++++++++++++
 examples/cpp/send_recv_mt.cpp                   | 269 -----------------
 6 files changed, 486 insertions(+), 372 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index df9f6a7..d116913 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -63,7 +63,8 @@ if(HAS_CPP11)
   # Examples that require C++11
   foreach(example
       scheduled_send
-      send_recv_mt
+      multithreaded_client
+      multithreaded_client_flow_control
       )
     add_executable(${example} ${example}.cpp)
   endforeach()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 98f1d90..8f9f64c 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -300,5 +300,17 @@ Hello World!
             expect_found = (out.find(expect) >= 0)
             self.assertEqual(expect_found, True)
 
+    def test_multithreaded_client(self):
+        with TestPort() as tp:
+            want = ""
+            got = self.proc(["multithreaded_client", 10, 2, tp.addr]).wait_exit()
+            self.assertMultiLineEqual(want, got)
+
+    def test_multithreaded_client_flow_control(self):
+        with TestPort() as tp:
+            want = ""
+            got = self.proc(["multithreaded_client_flow_control", "10", "2", tp.addr]).wait_exit()
+            self.assertMultiLineEqual(want, got)
+
 if __name__ == "__main__":
     unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/mt_queue.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt_queue.hpp b/examples/cpp/mt_queue.hpp
deleted file mode 100644
index f053ebe..0000000
--- a/examples/cpp/mt_queue.hpp
+++ /dev/null
@@ -1,102 +0,0 @@
-#ifndef MT_QUEUE_HPP
-#define MT_QUEUE_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <condition_variable>
-#include <stdexcept>
-#include <mutex>
-#include <queue>
-
-class closed_error : public std::runtime_error {
-  public:
-    closed_error() : std::runtime_error("closed") {}
-};
-
-// A bounded, thread-safe queue.
-// Objects are moved on and off the queue, not copied. Avoids overhead of copy operations.
-template <class T, size_t CAPACITY> class mt_queue {
-    std::queue<T> q_;
-    std::mutex lock_;
-    std::condition_variable push_;
-    std::condition_variable pop_;
-    bool closed_;
-
-    void do_push(T&& x) {
-        q_.push(std::move(x));
-        pop_.notify_one();
-    }
-
-    T do_pop() {
-        T x(std::move(q_.front()));
-        q_.pop();
-        push_.notify_one();
-        return x;
-    }
-
-    bool can_push() { return q_.size() < CAPACITY; }
-    bool can_pop() { return q_.size() > 0; }
-
-  public:
-
-    mt_queue() : closed_(false) {}
-
-    void push(T&& x) {
-        std::unique_lock<std::mutex> l(lock_);
-        while(!can_push())
-            push_.wait(l);
-        do_push(std::move(x));
-    }
-
-    T pop() {
-        std::unique_lock<std::mutex> l(lock_);
-        while(!can_pop())
-            pop_.wait(l);
-        return do_pop();
-    }
-
-    bool try_push(T&& x) noexcept {
-        std::lock_guard<std::mutex> l(lock_);
-        bool ok = can_push();
-        if (ok)
-            do_push(std::move(x));
-        return ok;
-    }
-
-    bool try_pop(T& x) noexcept {
-        std::lock_guard<std::mutex> l(lock_);
-        bool ok = can_pop();
-        if (ok)
-            x = std::move(do_pop());
-        return ok;
-    }
-
-    size_t capacity() noexcept {
-        return CAPACITY;
-    }
-
-    size_t size() noexcept {
-        std::lock_guard<std::mutex> l(lock_);
-        return q_.size();
-    }
-};
-
-
-#endif // MT_QUEUE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/multithreaded_client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client.cpp b/examples/cpp/multithreaded_client.cpp
new file mode 100644
index 0000000..955655c
--- /dev/null
+++ b/examples/cpp/multithreaded_client.cpp
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//
+// C++11 only
+//
+// A multi-threaded client that calls proton::container::run() in one thread, sends
+// messages in another and receives messages in a third.
+//
+// Note this client does not deal with flow-control. If the sender is faster
+// than the receiver, messages will build up in memory on the sending side.
+// See @ref multithreaded_client_flow_control.cpp for a more complex example with
+// flow control.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// Handler for a single thread-safe sending and receiving connection.
+class client : public proton::messaging_handler {
+    // Invariant
+    const std::string url_;
+    const std::string address_;
+
+    // Only used in proton handler thread
+    proton::sender sender_;
+
+    // Shared by proton and user threads, protected by lock_
+    std::mutex lock_;
+    proton::work_queue *work_queue_;
+    std::condition_variable sender_ready_;
+    std::queue<proton::message> messages_;
+    std::condition_variable messages_ready_;
+
+  public:
+    client(const std::string& url, const std::string& address) : url_(url), address_(address) {}
+
+    // Thread safe
+    void send(const proton::message& msg) {
+        // Use [=] to copy the message, we cannot pass it by reference since it
+        // will be used in another thread.
+        work_queue()->add([=]() { sender_.send(msg); });
+    }
+
+    // Thread safe
+    proton::message receive() {
+        std::unique_lock<std::mutex> l(lock_);
+        while (messages_.empty()) messages_ready_.wait(l);
+        auto msg = std::move(messages_.front());
+        messages_.pop();
+        return msg;
+    }
+
+    // Thread safe
+    void close() {
+        work_queue()->add([=]() { sender_.connection().close(); });
+    }
+
+  private:
+
+    proton::work_queue* work_queue() {
+        // Wait till work_queue_ and sender_ are initialized.
+        std::unique_lock<std::mutex> l(lock_);
+        while (!work_queue_) sender_ready_.wait(l);
+        return work_queue_;
+    }
+
+    // == messaging_handler overrides, only called in proton hander thread
+
+    // Note: this example creates a connection when the container starts.
+    // To create connections after the container has started, use
+    // container::connect().
+    // See @ref multithreaded_client_flow_control.cpp for an example.
+    void on_container_start(proton::container& cont) override {
+        cont.connect(url_);
+    }
+
+    void on_connection_open(proton::connection& conn) override {
+        conn.open_sender(address_);
+        conn.open_receiver(address_);
+    }
+
+    void on_sender_open(proton::sender& s) override {
+        {
+            // sender_ and work_queue_ must be set atomically
+            std::lock_guard<std::mutex> l(lock_);
+            sender_ = s;
+            work_queue_ = &s.work_queue();
+        }
+        sender_ready_.notify_all();
+    }
+
+    void on_message(proton::delivery& dlv, proton::message& msg) override {
+        {
+            std::lock_guard<std::mutex> l(lock_);
+            messages_.push(msg);
+        }
+        messages_ready_.notify_all();
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        std::cerr << "unexpected error: " << e << std::endl;
+        exit(1);
+    }
+};
+
+int main(int argc, const char** argv) {
+    try {
+        if (argc != 4) {
+            std ::cerr <<
+                "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
+                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+                "MESSAGE-COUNT: number of messages to send\n";
+            return 1;
+        }
+        const char *url = argv[1];
+        const char *address = argv[2];
+        int n_messages = atoi(argv[3]);
+
+        client cl(url, address);
+        proton::container container(cl);
+        std::thread container_thread([&]() { container.run(); });
+
+        std::thread sender([&]() {
+                for (int i = 0; i < n_messages; ++i) {
+                    proton::message msg(std::to_string(i + 1));
+                    cl.send(msg);
+                    std::cout << "sent: " << msg.body() << std::endl;
+                }
+            });
+
+        int received = 0;
+        std::thread receiver([&]() {
+                for (int i = 0; i < n_messages; ++i) {
+                    auto msg = cl.receive();
+                    std::cout << "received: " << msg.body() << std::endl;
+                    ++received;
+                }
+            });
+
+        sender.join();
+        receiver.join();
+        cl.close();
+        container_thread.join();
+        std::cout << "received " << received << " messages" << std::endl;
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/multithreaded_client_flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client_flow_control.cpp b/examples/cpp/multithreaded_client_flow_control.cpp
new file mode 100644
index 0000000..9eec782
--- /dev/null
+++ b/examples/cpp/multithreaded_client_flow_control.cpp
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// C++11 only
+//
+// A multi-threaded client that sends and receives messages from multiple AMQP
+// addresses.
+//
+// Demonstrates how to:
+//
+// - implement proton handlers that interact with user threads safely
+// - block sender threads to respect AMQP flow control
+// - use AMQP flow control to limit message buffering for receivers threads
+//
+// We define sender and receiver classes with simple, thread-safe blocking
+// send() and receive() functions.
+//
+// These classes are also privately proton::message_handler instances. They use
+// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
+// etc.) to pass messages between user and proton::container threads.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// A thread-safe sending connection that blocks sending threads when there
+// is no AMQP credit to send messages.
+class sender : private proton::messaging_handler {
+    // Only used in proton handler thread
+    proton::sender sender_;
+
+    // Shared by proton and user threads, protected by lock_
+    std::mutex lock_;
+    proton::work_queue *work_queue_;
+    std::condition_variable sender_ready_;
+    int queued_;                       // Queued messages waiting to be sent
+    int credit_;                       // AMQP credit - number of messages we can send
+
+  public:
+    sender(proton::container& cont, const std::string& url, const std::string& address)
+        : work_queue_(0), queued_(0), credit_(0)
+    {
+        cont.open_sender(url+"/"+address, proton::connection_options().handler(*this));
+    }
+
+    // Thread safe
+    void send(const proton::message& m) {
+        {
+            std::unique_lock<std::mutex> l(lock_);
+            // Don't queue up more messages than we have credit for
+            while (!work_queue_ || queued_ >= credit_) sender_ready_.wait(l);
+            ++queued_;
+        }
+        work_queue_->add([=]() { this->do_send(m); }); // work_queue_ is thread safe
+    }
+
+    // Thread safe
+    void close() {
+        work_queue()->add([=]() { sender_.connection().close(); });
+    }
+
+  private:
+
+    proton::work_queue* work_queue() {
+        // Wait till work_queue_ and sender_ are initialized.
+        std::unique_lock<std::mutex> l(lock_);
+        while (!work_queue_) sender_ready_.wait(l);
+        return work_queue_;
+    }
+
+    // == messaging_handler overrides, only called in proton hander thread
+
+    void on_sender_open(proton::sender& s) override {
+        // Make sure sender_ and work_queue_ are set atomically
+        std::lock_guard<std::mutex> l(lock_);
+        sender_ = s;
+        work_queue_ = &s.work_queue();
+    }
+
+    void on_sendable(proton::sender& s) override {
+        std::lock_guard<std::mutex> l(lock_);
+        credit_ = s.credit();
+        sender_ready_.notify_all(); // Notify senders we have credit
+    }
+
+    // work_queue work items is are automatically dequeued and called by proton
+    // This function is called because it was queued by send()
+    void do_send(const proton::message& m) {
+        sender_.send(m);
+        std::lock_guard<std::mutex> l(lock_);
+        --queued_;                    // work item was consumed from the work_queue
+        credit_ = sender_.credit();   // update credit
+        sender_ready_.notify_all();       // Notify senders we have space on queue
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        std::cerr << "unexpected error: " << e << std::endl;
+        exit(1);
+    }
+};
+
+// A thread safe receiving connection that blocks receiving threads when there
+// are no messages available, and maintains a bounded buffer of incoming
+// messages by issuing AMQP credit only when there is space in the buffer.
+class receiver : private proton::messaging_handler {
+    static const size_t MAX_BUFFER = 100; // Max number of buffered messages
+
+    // Used in proton threads only
+    proton::receiver receiver_;
+
+    // Used in proton and user threads, protected by lock_
+    std::mutex lock_;
+    proton::work_queue* work_queue_;
+    std::queue<proton::message> buffer_; // Messages not yet returned by receive()
+    std::condition_variable can_receive_; // Notify receivers of messages
+
+  public:
+
+    // Connect to url
+    receiver(proton::container& cont, const std::string& url, const std::string& address)
+        : work_queue_()
+    {
+        // NOTE:credit_window(0) disables automatic flow control.
+        // We will use flow control to match AMQP credit to buffer capacity.
+        cont.open_receiver(url+"/"+address, proton::receiver_options().credit_window(0),
+                           proton::connection_options().handler(*this));
+    }
+
+    // Thread safe receive
+    proton::message receive() {
+        std::unique_lock<std::mutex> l(lock_);
+        // Wait for buffered messages
+        while (!work_queue_ || buffer_.empty())
+            can_receive_.wait(l);
+        proton::message m = std::move(buffer_.front());
+        buffer_.pop();
+        // Add a lambda to the work queue to call receive_done().
+        // This will tell the handler to add more credit.
+        work_queue_->add([=]() { this->receive_done(); });
+        return m;
+    }
+
+    void close() {
+        std::lock_guard<std::mutex> l(lock_);
+        if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close(); });
+    }
+
+  private:
+    // ==== The following are called by proton threads only.
+
+    void on_receiver_open(proton::receiver& r) override {
+        receiver_ = r;
+        std::lock_guard<std::mutex> l(lock_);
+        work_queue_ = &receiver_.work_queue();
+        receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
+    }
+
+    void on_message(proton::delivery &d, proton::message &m) override {
+        // Proton automatically reduces credit by 1 before calling on_message
+        std::lock_guard<std::mutex> l(lock_);
+        buffer_.push(m);
+        can_receive_.notify_all();
+    }
+
+    // called via work_queue
+    void receive_done() {
+        // Add 1 credit, a receiver has taken a message out of the buffer.
+        receiver_.add_credit(1);
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        std::cerr << "unexpected error: " << e << std::endl;
+        exit(1);
+    }
+};
+
+// ==== Example code using the sender and receiver
+
+// Send n messages
+void send_thread(sender& s, int n, bool print) {
+    auto id = std::this_thread::get_id();
+    for (int i = 0; i < n; ++i) {
+        std::ostringstream ss;
+        ss << std::this_thread::get_id() << ":" << i;
+        s.send(proton::message(ss.str()));
+        if (print) std::cout << "received: " << ss.str() << std::endl;
+    }
+    std::cout << id << " sent " << n << std::endl;
+}
+
+// Receive messages till atomic remaining count is 0.
+// remaining is shared among all receiving threads
+void receive_thread(receiver& r, std::atomic_int& remaining, bool print) {
+    auto id = std::this_thread::get_id();
+    int n = 0;
+    while (remaining-- > 0) {
+        auto m = r.receive();
+        ++n;
+        if (print) std::cout << id << "received: " << m.body() << std::endl;
+    }
+    std::cout << id << " received " << n << " messages" << std::endl;
+}
+
+int main(int argc, const char **argv) {
+    try {
+        if (argc != 5) {
+            std::cerr <<
+                "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n"
+                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+                "MESSAGE-COUNT: number of messages to send\n"
+                "THREAD-COUNT: number of sender/receiver thread pairs\n";
+            return 1;
+        }
+
+        const char *url = argv[1];
+        const char *address = argv[2];
+        int n_messages = atoi(argv[3]);
+        int n_threads = atoi(argv[4]);
+
+        // Total messages to be received, multiple receiver threads will decrement this.
+        std::atomic_int remaining(n_messages * n_threads);
+        bool print = remaining < 1000; // Don't print for long runs, dominates run time
+
+        // Run the proton container
+        proton::container container;
+        auto container_thread = std::thread([&]() { container.run(); });
+
+        // A single sender and receiver to be shared by all the threads
+        sender send(container, url, address);
+        receiver recv(container, url, address);
+
+        // Start receiver threads, then sender threads.
+        // Starting receivers first gives all receivers a chance to compete for messages.
+        std::vector<std::thread> threads;
+        for (int i = 0; i < n_threads; ++i)
+            threads.push_back(std::thread([&]() { receive_thread(recv, remaining, print); }));
+        for (int i = 0; i < n_threads; ++i)
+            threads.push_back(std::thread([&]() { send_thread(send, n_messages, print); }));
+
+        // Wait for threads to finish
+        for (auto& t : threads)
+            t.join();
+        send.close();
+        recv.close();
+
+        container_thread.join();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/send_recv_mt.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/send_recv_mt.cpp b/examples/cpp/send_recv_mt.cpp
deleted file mode 100644
index addcbaf..0000000
--- a/examples/cpp/send_recv_mt.cpp
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// C++11 only
-//
-// A multi-threaded client that sends and receives messages from multiple AMQP
-// addresses.
-//
-// Demonstrates how to:
-//
-// - implement proton handlers that interact with user threads safely
-// - block user threads calling send() to respect AMQP flow control
-// - use AMQP flow control to limit message buffering for receivers
-//
-// We define mt_sender and mt_receiver classes with simple, thread-safe blocking
-// send() and receive() functions.
-//
-// These classes are also privately proton::message_handler instances. They use
-// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
-// etc.) to pass messages between user and proton::container threads.
-//
-// NOTE: no proper error handling
-
-#include <proton/connection.hpp>
-#include <proton/connection_options.hpp>
-#include <proton/container.hpp>
-#include <proton/message.hpp>
-#include <proton/messaging_handler.hpp>
-#include <proton/receiver_options.hpp>
-#include <proton/sender.hpp>
-#include <proton/work_queue.hpp>
-
-#include <atomic>
-#include <condition_variable>
-#include <iostream>
-#include <mutex>
-#include <queue>
-#include <sstream>
-#include <thread>
-
-// Lock to serialize std::cout, std::cerr used from multiple threads.
-std::mutex out_lock;
-#define LOCK(EXPR) do { std::lock_guard<std::mutex> l(out_lock); EXPR; } while(0)
-#define COUT(EXPR) do { LOCK(std::cout << EXPR); } while(0)
-#define CERR(EXPR) do { LOCK(std::cerr << EXPR); } while(0)
-
-// A thread-safe sending connection.
-class mt_sender : private proton::messaging_handler {
-    // Only used in proton thread
-    proton::sender sender_;
-
-    // Shared by proton and user threads, use lock_ to protect.
-    std::mutex lock_;
-    proton::work_queue* work_queue_;   // Messages waiting to be sent
-    std::condition_variable can_send_; // Signal sending threads
-    int queued_;                       // Queued messages waiting to be sent
-    int credit_;                       // AMQP credit - number of messages we can send
-
-  public:
-    // Connect to url
-    mt_sender(proton::container& cont, const std::string& url) :
-        work_queue_(0), queued_(0), credit_(0)
-    {
-        // Pass *this as handler.
-        cont.open_sender(url, proton::connection_options().handler(*this));
-    }
-
-    // Thread safe send()
-    void send(const proton::message& m) {
-        std::unique_lock<std::mutex> l(lock_);
-        // Don't queue up more messages than we have credit for
-        while (!(work_queue_ && queued_ < credit_))
-            can_send_.wait(l);
-        ++queued_;
-        // Add a lambda function to the work queue.
-        // This will call do_send() with a copy of m in the correct proton thread.
-        work_queue_->add([=]() { this->do_send(m); });
-    }
-
-    void close() {
-        std::lock_guard<std::mutex> l(lock_);
-        if (work_queue_)
-            work_queue_->add([this]() { this->sender_.connection().close(); });
-    }
-
-  private:
-    // ==== called by proton threads only
-
-    void on_sender_open(proton::sender& s) override {
-        sender_ = s;
-        std::lock_guard<std::mutex> l(lock_);
-        work_queue_ = &s.work_queue();
-    }
-
-    void on_sendable(proton::sender& s) override {
-        std::lock_guard<std::mutex> l(lock_);
-        credit_ = s.credit();
-        can_send_.notify_all(); // Notify senders we have credit
-    }
-
-    // work_queue work items is are automatically dequeued and called by proton
-    // This function is called because it was queued by send()
-    void do_send(const proton::message& m) {
-        sender_.send(m);
-        std::lock_guard<std::mutex> l(lock_);
-        --queued_;                    // work item was consumed from the work_queue
-        credit_ = sender_.credit();   // update credit
-        can_send_.notify_all();       // Notify senders we have space on queue
-    }
-
-    void on_error(const proton::error_condition& e) override {
-        CERR("unexpected error: " << e << std::endl);
-        exit(1);
-    }
-};
-
-// A thread safe receiving connection.
-class mt_receiver : private proton::messaging_handler {
-    static const size_t MAX_BUFFER = 100; // Max number of buffered messages
-
-    // Used in proton threads only
-    proton::receiver receiver_;
-
-    // Used in proton and user threads, protected by lock_
-    std::mutex lock_;
-    proton::work_queue* work_queue_;
-    std::queue<proton::message> buffer_; // Messages not yet returned by receive()
-    std::condition_variable can_receive_; // Notify receivers of messages
-
-  public:
-
-    // Connect to url
-    mt_receiver(proton::container& cont, const std::string& url) : work_queue_()
-    {
-        // NOTE:credit_window(0) disables automatic flow control.
-        // We will use flow control to match AMQP credit to buffer capacity.
-        cont.open_receiver(url, proton::receiver_options().credit_window(0),
-                           proton::connection_options().handler(*this));
-    }
-
-    // Thread safe receive
-    proton::message receive() {
-        std::unique_lock<std::mutex> l(lock_);
-        // Wait for buffered messages
-        while (!work_queue_ || buffer_.empty())
-            can_receive_.wait(l);
-        proton::message m = std::move(buffer_.front());
-        buffer_.pop();
-        // Add a lambda to the work queue to call receive_done().
-        // This will tell the handler to add more credit.
-        work_queue_->add([=]() { this->receive_done(); });
-        return m;
-    }
-
-    void close() {
-        std::lock_guard<std::mutex> l(lock_);
-        if (work_queue_)
-            work_queue_->add([this]() { this->receiver_.connection().close(); });
-    }
-
-  private:
-    // ==== The following are called by proton threads only.
-
-    void on_receiver_open(proton::receiver& r) override {
-        receiver_ = r;
-        std::lock_guard<std::mutex> l(lock_);
-        work_queue_ = &receiver_.work_queue();
-        receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
-    }
-
-    void on_message(proton::delivery &d, proton::message &m) override {
-        // Proton automatically reduces credit by 1 before calling on_message
-        std::lock_guard<std::mutex> l(lock_);
-        buffer_.push(m);
-        can_receive_.notify_all();
-    }
-
-    // called via work_queue
-    void receive_done() {
-        // Add 1 credit, a receiver has taken a message out of the buffer.
-        receiver_.add_credit(1);
-    }
-
-    void on_error(const proton::error_condition& e) override {
-        CERR("unexpected error: " << e << std::endl);
-        exit(1);
-    }
-};
-
-// ==== Example code using the mt_sender and mt_receiver
-
-// Send n messages
-void send_thread(mt_sender& s, int n) {
-    for (int i = 0; i < n; ++i) {
-        std::ostringstream o;
-        o << std::this_thread::get_id() << ":" << i;
-        s.send(proton::message(o.str()));
-    }
-    COUT(std::this_thread::get_id() << " sent " << n << std::endl);
-}
-
-// Receive messages till atomic remaining count is 0.
-// remaining is shared among all receiving threads
-void receive_thread(mt_receiver& r, std::atomic_int& remaining, bool print) {
-    auto id = std::this_thread::get_id();
-    int n = 0;
-    while (remaining-- > 0) {
-        auto m = r.receive();
-        ++n;
-        if (print)
-            COUT(id << " received \"" << m.body() << '"' << std::endl);
-    }
-    COUT(id << " received " << n << " messages" << std::endl);
-}
-
-int main(int argc, const char **argv) {
-    try {
-        int n_threads = argc > 1 ? atoi(argv[1]) : 2;
-        int n_messages = argc > 2 ? atoi(argv[2]) : 10;
-        const char *url =  argc > 3 ? argv[3] : "amqp://127.0.0.1/examples";
-        std::atomic_int remaining(n_messages * n_threads); // Total messages to be received
-        bool print = (remaining <= 30); // Print messages for short runs only
-
-        // Run the proton container
-        proton::container container;
-        auto container_thread = std::thread([&]() { container.run(); });
-
-        // A single sender and receiver to be shared by all the threads
-        mt_sender sender(container, url);
-        mt_receiver receiver(container, url);
-
-        // Start receiver threads, then sender threads.
-        // Starting receivers first gives all receivers a chance to compete for messages.
-        std::vector<std::thread> threads;
-        for (int i = 0; i < n_threads; ++i)
-            threads.push_back(std::thread([&]() { receive_thread(receiver, remaining, print); }));
-        for (int i = 0; i < n_threads; ++i)
-            threads.push_back(std::thread([&]() { send_thread(sender, n_messages); }));
-
-        // Wait for threads to finish
-        for (auto& n_messages_threads : threads)
-            n_messages_threads.join();
-        sender.close();
-        receiver.close();
-
-        container_thread.join();
-
-        return 0;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-proton git commit: PROTON-1553: c++ wake() events

Posted by ac...@apache.org.
PROTON-1553: c++  wake() events


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/68c8cf43
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/68c8cf43
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/68c8cf43

Branch: refs/heads/master
Commit: 68c8cf43e51bbe71d7042688fbcda52de1ed5cce
Parents: f1ee268
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Aug 24 15:34:35 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Sat Aug 26 13:20:59 2017 -0400

----------------------------------------------------------------------
 examples/cpp/mt_queue.hpp                       | 102 +++++++++++++++++++
 .../bindings/cpp/include/proton/connection.hpp  |  20 ++++
 .../cpp/include/proton/internal/object.hpp      |   4 +-
 .../cpp/include/proton/messaging_handler.hpp    |  15 ++-
 proton-c/bindings/cpp/src/connection.cpp        |   5 +
 proton-c/bindings/cpp/src/handler.cpp           |   2 +
 proton-c/bindings/cpp/src/messaging_adapter.cpp |   7 ++
 .../cpp/src/proactor_container_impl.cpp         |   6 +-
 8 files changed, 153 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/examples/cpp/mt_queue.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt_queue.hpp b/examples/cpp/mt_queue.hpp
new file mode 100644
index 0000000..f053ebe
--- /dev/null
+++ b/examples/cpp/mt_queue.hpp
@@ -0,0 +1,102 @@
+#ifndef MT_QUEUE_HPP
+#define MT_QUEUE_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <condition_variable>
+#include <stdexcept>
+#include <mutex>
+#include <queue>
+
+class closed_error : public std::runtime_error {
+  public:
+    closed_error() : std::runtime_error("closed") {}
+};
+
+// A bounded, thread-safe queue.
+// Objects are moved on and off the queue, not copied. Avoids overhead of copy operations.
+template <class T, size_t CAPACITY> class mt_queue {
+    std::queue<T> q_;
+    std::mutex lock_;
+    std::condition_variable push_;
+    std::condition_variable pop_;
+    bool closed_;
+
+    void do_push(T&& x) {
+        q_.push(std::move(x));
+        pop_.notify_one();
+    }
+
+    T do_pop() {
+        T x(std::move(q_.front()));
+        q_.pop();
+        push_.notify_one();
+        return x;
+    }
+
+    bool can_push() { return q_.size() < CAPACITY; }
+    bool can_pop() { return q_.size() > 0; }
+
+  public:
+
+    mt_queue() : closed_(false) {}
+
+    void push(T&& x) {
+        std::unique_lock<std::mutex> l(lock_);
+        while(!can_push())
+            push_.wait(l);
+        do_push(std::move(x));
+    }
+
+    T pop() {
+        std::unique_lock<std::mutex> l(lock_);
+        while(!can_pop())
+            pop_.wait(l);
+        return do_pop();
+    }
+
+    bool try_push(T&& x) noexcept {
+        std::lock_guard<std::mutex> l(lock_);
+        bool ok = can_push();
+        if (ok)
+            do_push(std::move(x));
+        return ok;
+    }
+
+    bool try_pop(T& x) noexcept {
+        std::lock_guard<std::mutex> l(lock_);
+        bool ok = can_pop();
+        if (ok)
+            x = std::move(do_pop());
+        return ok;
+    }
+
+    size_t capacity() noexcept {
+        return CAPACITY;
+    }
+
+    size_t size() noexcept {
+        std::lock_guard<std::mutex> l(lock_);
+        return q_.size();
+    }
+};
+
+
+#endif // MT_QUEUE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index 10ea61b..58e2afc 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -139,6 +139,26 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
     /// @see @ref connection_options::idle_timeout
     PN_CPP_EXTERN uint32_t idle_timeout() const;
 
+    /// **Experimental** - trigger thread-safe call to messaging_handler::on_wake()
+    ///
+    /// *Thread safe*: this is the *only* @ref connection function that can be
+    /// called from outside the handler thread.
+    ///
+    /// messaging_handler::on_wake() will be called on the handler as soon as
+    /// possible after the call to wake(), possibly in a different thread.
+    ///
+    /// @note
+    /// * Multiple calls to wake() may be coalesced into a single call to on_wake()
+    ///   that occurs after all of them.
+    /// * Spurious on_wake() calls can occur even if the application does not call
+    ///   on_wake()
+    ///
+    /// wake() is the primitive building-block for thread-safe applications.
+    /// With C++11 or greater, @ref work_queue provides an easier way execute
+    /// code safely in the handler thread.
+    ///
+    PN_CPP_EXTERN void wake() const;
+
     /// @cond INTERNAL
   friend class internal::factory<connection>;
   friend class container;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/include/proton/internal/object.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/object.hpp b/proton-c/bindings/cpp/include/proton/internal/object.hpp
index 442b09d..388c65c 100644
--- a/proton-c/bindings/cpp/include/proton/internal/object.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/object.hpp
@@ -85,9 +85,9 @@ template <class T> pn_ptr<T> take_ownership(T* p) { return pn_ptr<T>::take_owner
 /// Base class for proton object types.
 template <class T> class object : private comparable<object<T> > {
   public:
-    bool operator!() const { return !object_; }
+    bool operator!() const { return !object_.get(); }
 #if PN_CPP_HAS_EXPLICIT_CONVERSIONS
-    explicit operator bool() const { return object_; }
+    explicit operator bool() const { return object_.get(); }
 #endif
 
   protected:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
index a5e2bdd..0792dba 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
@@ -146,11 +146,24 @@ PN_CPP_CLASS_EXTERN messaging_handler {
     /// **Experimental** - The receiving peer has requested a drain of
     /// remaining credit.
     PN_CPP_EXTERN virtual void on_sender_drain_start(sender &s);
-    
+
     /// **Experimental** - The credit outstanding at the time of the
     /// call to receiver::drain has been consumed or returned.
     PN_CPP_EXTERN virtual void on_receiver_drain_finish(receiver &r);
 
+    /// *Experimental** - a wakeup event that can be triggered from another thread.
+    ///
+    /// @see connection::wake()
+    ///
+    /// The on_wake() event carries no information about why it was called.
+    /// Recommended use is that the messaging_handler have shared, thread-safe
+    /// members that it examines to decide how/if to respond to the wake.
+    ///
+    /// @note on_wake() can be called internally by proton without any
+    /// application calls to connection::wake()
+    ///
+    PN_CPP_EXTERN virtual void on_wake(connection&);
+
     /// Fallback error handling.
     PN_CPP_EXTERN virtual void on_error(const error_condition &c);
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp
index a37d3b5..1d66c41 100644
--- a/proton-c/bindings/cpp/src/connection.cpp
+++ b/proton-c/bindings/cpp/src/connection.cpp
@@ -40,6 +40,7 @@
 #include <proton/session.h>
 #include <proton/transport.h>
 #include <proton/object.h>
+#include <proton/proactor.h>
 
 namespace proton {
 
@@ -174,4 +175,8 @@ uint32_t connection::idle_timeout() const {
     return pn_transport_get_remote_idle_timeout(pn_connection_transport(pn_object()));
 }
 
+void connection::wake() const {
+    pn_connection_wake(pn_object());
+}
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/handler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp
index 84d9e8a..8f7fbe9 100644
--- a/proton-c/bindings/cpp/src/handler.cpp
+++ b/proton-c/bindings/cpp/src/handler.cpp
@@ -85,7 +85,9 @@ void messaging_handler::on_tracker_settle(tracker &) {}
 void messaging_handler::on_delivery_settle(delivery &) {}
 void messaging_handler::on_sender_drain_start(sender &) {}
 void messaging_handler::on_receiver_drain_finish(receiver &) {}
+void messaging_handler::on_wake(connection&) {}
 
 void messaging_handler::on_error(const error_condition& c) { throw proton::error(c.what()); }
 
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index cb1b776..2357cfa 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -296,6 +296,11 @@ void on_transport_closed(messaging_handler& handler, pn_event_t* event) {
     handler.on_transport_close(t);
 }
 
+void on_wake(messaging_handler& handler, pn_event_t* event) {
+    connection c(make_wrapper(pn_event_connection(event)));
+    handler.on_wake(c);
+}
+
 }
 
 void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event)
@@ -321,6 +326,8 @@ void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event)
 
       case PN_TRANSPORT_CLOSED: on_transport_closed(handler, event); break;
 
+      case PN_CONNECTION_WAKE: on_wake(handler, event); break;
+
       // Ignore everything else
       default: break;
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 1389306..b262f8c 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -381,10 +381,6 @@ bool container::impl::handle(pn_event_t* event) {
         }
         return false;
     }
-    // If the event was just connection wake then there isn't anything more to do
-    case PN_CONNECTION_WAKE:
-        return false;
-
     // Connection driver will bind a new transport to the connection at this point
     case PN_CONNECTION_INIT:
         return false;
@@ -443,7 +439,7 @@ void container::impl::thread() {
           finished = handle(e);
           if (finished) break;
         }
-      } catch (proton::error& e) {
+      } catch (std::exception& e) {
         // If we caught an exception then shutdown the (other threads of the) container
         disconnect_error_ = error_condition("exception", e.what());
         if (!stopping_) stop(disconnect_error_);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org