You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/08/28 21:57:31 UTC
[1/3] qpid-proton git commit: PROTON-1554: Remove thread-safe
template, simplify returned<>
Repository: qpid-proton
Updated Branches:
refs/heads/master ed756d8f3 -> 298e7dba1
PROTON-1554: Remove thread-safe template, simplify returned<>
Removed the thread-safe template.
Simplified and documented returned<>: It can *only* be converted to proton
object, and only in a single-threaded application.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f1ee2681
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f1ee2681
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f1ee2681
Branch: refs/heads/master
Commit: f1ee268163ce80c2b9dc3dcb7ec00151f5c6b486
Parents: ed756d8
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Aug 24 16:49:08 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Sat Aug 26 13:16:50 2017 -0400
----------------------------------------------------------------------
examples/cpp/broker.cpp | 2 +-
examples/cpp/client.cpp | 1 -
examples/cpp/connection_options.cpp | 1 -
examples/cpp/flow_control.cpp | 1 -
examples/cpp/helloworld.cpp | 1 -
examples/cpp/helloworld_direct.cpp | 1 -
examples/cpp/queue_browser.cpp | 1 -
examples/cpp/scheduled_send.cpp | 2 +-
examples/cpp/scheduled_send_03.cpp | 1 -
examples/cpp/selected_recv.cpp | 1 -
examples/cpp/server.cpp | 1 -
examples/cpp/service_bus.cpp | 2 +-
examples/cpp/simple_recv.cpp | 1 -
examples/cpp/simple_send.cpp | 1 -
examples/cpp/ssl.cpp | 1 -
examples/cpp/ssl_client_cert.cpp | 1 -
proton-c/bindings/cpp/CMakeLists.txt | 2 +-
proton-c/bindings/cpp/docs/headers.dox | 2 -
.../bindings/cpp/include/proton/connection.hpp | 5 +-
.../bindings/cpp/include/proton/container.hpp | 28 +++-
.../bindings/cpp/include/proton/delivery.hpp | 2 +-
proton-c/bindings/cpp/include/proton/fwd.hpp | 2 -
.../cpp/include/proton/internal/object.hpp | 5 +-
.../cpp/include/proton/io/connection_driver.hpp | 1 -
.../bindings/cpp/include/proton/receiver.hpp | 3 +-
.../bindings/cpp/include/proton/returned.hpp | 62 +++++++
proton-c/bindings/cpp/include/proton/sender.hpp | 1 -
.../bindings/cpp/include/proton/session.hpp | 1 -
.../bindings/cpp/include/proton/thread_safe.hpp | 165 -------------------
.../bindings/cpp/include/proton/work_queue.hpp | 6 +-
.../bindings/cpp/src/connection_driver_test.cpp | 2 +-
proton-c/bindings/cpp/src/container.cpp | 1 -
proton-c/bindings/cpp/src/container_test.cpp | 1 -
.../bindings/cpp/src/include/proton_bits.hpp | 6 +
.../cpp/src/proactor_container_impl.cpp | 9 +-
proton-c/bindings/cpp/src/returned.cpp | 41 +++++
proton-c/bindings/cpp/src/thread_safe_test.cpp | 108 ------------
tests/tools/apps/cpp/reactor_send.cpp | 1 -
38 files changed, 155 insertions(+), 318 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp
index 198b449..a236bb1 100644
--- a/examples/cpp/broker.cpp
+++ b/examples/cpp/broker.cpp
@@ -35,9 +35,9 @@
#include <proton/source_options.hpp>
#include <proton/target.hpp>
#include <proton/target_options.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/transport.hpp>
+#include <proton/work_queue.hpp>
#include <deque>
#include <iostream>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp
index 7139155..81a9a32 100644
--- a/examples/cpp/client.cpp
+++ b/examples/cpp/client.cpp
@@ -28,7 +28,6 @@
#include <proton/messaging_handler.hpp>
#include <proton/receiver_options.hpp>
#include <proton/source_options.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <iostream>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/connection_options.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/connection_options.cpp b/examples/cpp/connection_options.cpp
index f718060..a696c6d 100644
--- a/examples/cpp/connection_options.cpp
+++ b/examples/cpp/connection_options.cpp
@@ -24,7 +24,6 @@
#include <proton/container.hpp>
#include <proton/default_container.hpp>
#include <proton/messaging_handler.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/transport.hpp>
#include <iostream>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/flow_control.cpp b/examples/cpp/flow_control.cpp
index c0b8739..7b1474e 100644
--- a/examples/cpp/flow_control.cpp
+++ b/examples/cpp/flow_control.cpp
@@ -31,7 +31,6 @@
#include <proton/messaging_handler.hpp>
#include <proton/receiver_options.hpp>
#include <proton/sender.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <iostream>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/helloworld.cpp b/examples/cpp/helloworld.cpp
index 4aa5cdd..404d822 100644
--- a/examples/cpp/helloworld.cpp
+++ b/examples/cpp/helloworld.cpp
@@ -25,7 +25,6 @@
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/url.hpp>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/helloworld_direct.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/helloworld_direct.cpp b/examples/cpp/helloworld_direct.cpp
index 9331587..f879edd 100644
--- a/examples/cpp/helloworld_direct.cpp
+++ b/examples/cpp/helloworld_direct.cpp
@@ -26,7 +26,6 @@
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/sender.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <iostream>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/queue_browser.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/queue_browser.cpp b/examples/cpp/queue_browser.cpp
index 583277e..ef158b5 100644
--- a/examples/cpp/queue_browser.cpp
+++ b/examples/cpp/queue_browser.cpp
@@ -27,7 +27,6 @@
#include <proton/messaging_handler.hpp>
#include <proton/receiver_options.hpp>
#include <proton/source_options.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/url.hpp>
#include <iostream>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/scheduled_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/scheduled_send.cpp b/examples/cpp/scheduled_send.cpp
index 2914c44..4c71482 100644
--- a/examples/cpp/scheduled_send.cpp
+++ b/examples/cpp/scheduled_send.cpp
@@ -22,11 +22,11 @@
#include "options.hpp"
#include <proton/container.hpp>
+#include <proton/connection.hpp>
#include <proton/default_container.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/sender.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/work_queue.hpp>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/scheduled_send_03.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/scheduled_send_03.cpp b/examples/cpp/scheduled_send_03.cpp
index 008853c..20972e4 100644
--- a/examples/cpp/scheduled_send_03.cpp
+++ b/examples/cpp/scheduled_send_03.cpp
@@ -29,7 +29,6 @@
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/sender.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/work_queue.hpp>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/selected_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/selected_recv.cpp b/examples/cpp/selected_recv.cpp
index a48ef0e..771fb29 100644
--- a/examples/cpp/selected_recv.cpp
+++ b/examples/cpp/selected_recv.cpp
@@ -26,7 +26,6 @@
#include <proton/messaging_handler.hpp>
#include <proton/receiver_options.hpp>
#include <proton/source_options.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/url.hpp>
#include <iostream>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/server.cpp b/examples/cpp/server.cpp
index 449ce6e..573b3a0 100644
--- a/examples/cpp/server.cpp
+++ b/examples/cpp/server.cpp
@@ -27,7 +27,6 @@
#include <proton/message.hpp>
#include <proton/message_id.hpp>
#include <proton/messaging_handler.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/url.hpp>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/service_bus.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/service_bus.cpp b/examples/cpp/service_bus.cpp
index 6b57f8d..2c7a682 100644
--- a/examples/cpp/service_bus.cpp
+++ b/examples/cpp/service_bus.cpp
@@ -94,9 +94,9 @@ Done. No more messages.
#include <proton/sender.hpp>
#include <proton/sender_options.hpp>
#include <proton/source_options.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/url.hpp>
+#include <proton/work_queue.hpp>
#include <iostream>
#include <sstream>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_recv.cpp b/examples/cpp/simple_recv.cpp
index 145eef9..93b4868 100644
--- a/examples/cpp/simple_recv.cpp
+++ b/examples/cpp/simple_recv.cpp
@@ -30,7 +30,6 @@
#include <proton/message.hpp>
#include <proton/message_id.hpp>
#include <proton/messaging_handler.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/value.hpp>
#include <iostream>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_send.cpp b/examples/cpp/simple_send.cpp
index 358bbec..ebc02cb 100644
--- a/examples/cpp/simple_send.cpp
+++ b/examples/cpp/simple_send.cpp
@@ -28,7 +28,6 @@
#include <proton/message.hpp>
#include <proton/message_id.hpp>
#include <proton/messaging_handler.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/types.hpp>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/ssl.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/ssl.cpp b/examples/cpp/ssl.cpp
index 166bd61..e24961f 100644
--- a/examples/cpp/ssl.cpp
+++ b/examples/cpp/ssl.cpp
@@ -30,7 +30,6 @@
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/ssl.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/transport.hpp>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/examples/cpp/ssl_client_cert.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/ssl_client_cert.cpp b/examples/cpp/ssl_client_cert.cpp
index 8ca2dc2..c6c7666 100644
--- a/examples/cpp/ssl_client_cert.cpp
+++ b/examples/cpp/ssl_client_cert.cpp
@@ -28,7 +28,6 @@
#include <proton/messaging_handler.hpp>
#include <proton/sasl.hpp>
#include <proton/ssl.hpp>
-#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/transport.hpp>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 21ff26c..472105a 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -57,6 +57,7 @@ set(qpid-proton-cpp-source
src/receiver.cpp
src/receiver_options.cpp
src/reconnect_timer.cpp
+ src/returned.cpp
src/sasl.cpp
src/scalar_base.cpp
src/sender.cpp
@@ -169,7 +170,6 @@ endmacro(add_cpp_test)
add_cpp_test(codec_test)
add_cpp_test(connection_driver_test)
-add_cpp_test(thread_safe_test)
add_cpp_test(interop_test ${CMAKE_SOURCE_DIR}/tests)
add_cpp_test(message_test)
add_cpp_test(map_test)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/docs/headers.dox
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/headers.dox b/proton-c/bindings/cpp/docs/headers.dox
index 0ff7220..7e9d79c 100644
--- a/proton-c/bindings/cpp/docs/headers.dox
+++ b/proton-c/bindings/cpp/docs/headers.dox
@@ -14,7 +14,6 @@
/// @file proton/duration.hpp Time duration data type
/// @file proton/error_condition.hpp AMQP error condition
/// @file proton/error.hpp Base exception type thrown by proton functions
-/// @file proton/event_loop.hpp
/// @file proton/function.hpp
/// @file proton/fwd.hpp
/// @file proton/link.hpp
@@ -42,7 +41,6 @@
/// @file proton/target.hpp
/// @file proton/target_options.hpp
/// @file proton/terminus.hpp
-/// @file proton/thread_safe.hpp
/// @file proton/timestamp.hpp
/// @file proton/tracker.hpp
/// @file proton/transfer.hpp
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index ef75a4e..10ea61b 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -109,10 +109,10 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
PN_CPP_EXTERN receiver open_receiver(const std::string &addr,
const receiver_options &);
- /// @copydoc container::sender_options
+ /// @see proton::container::sender_options()
PN_CPP_EXTERN class sender_options sender_options() const;
- /// @copydoc container::receiver_options
+ /// @see container::receiver_options()
PN_CPP_EXTERN class receiver_options receiver_options() const;
/// Return all sessions on this connection.
@@ -142,7 +142,6 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
/// @cond INTERNAL
friend class internal::factory<connection>;
friend class container;
- friend class proton::thread_safe<connection>;
/// @endcond
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 859d70c..64e52ad 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -23,7 +23,7 @@
*/
#include "./fwd.hpp"
-#include "./thread_safe.hpp"
+#include "./returned.hpp"
#include "./types_fwd.hpp"
#include "./internal/config.hpp"
@@ -76,9 +76,14 @@ class PN_CPP_CLASS_EXTERN container {
/// The handler in the composed options is used to call
/// proton::messaging_handler::on_connection_open() when the remote peer's
/// open response is received.
+ ///
+ /// @return A returned<connection>
+ /// @copydetails returned
PN_CPP_EXTERN returned<connection> connect(const std::string& url, const connection_options &);
/// Connect to `url` and send an open request to the remote peer.
+ /// @return A returned<connection>
+ /// @copydetails returned
PN_CPP_EXTERN returned<connection> connect(const std::string& url);
/// Start listening on url.
@@ -135,12 +140,16 @@ class PN_CPP_CLASS_EXTERN container {
PN_CPP_EXTERN void stop();
/// Open a connection and sender for `url`.
+ /// @return A returned<sender>
+ /// @copydetails returned
PN_CPP_EXTERN returned<sender> open_sender(const std::string &url);
/// Open a connection and sender for `url`.
///
/// Supplied sender options will override the container's
/// template options.
+ /// @return A returned<sender>
+ /// @copydetails returned
PN_CPP_EXTERN returned<sender> open_sender(const std::string &url,
const proton::sender_options &o);
@@ -148,6 +157,8 @@ class PN_CPP_CLASS_EXTERN container {
///
/// Supplied connection options will override the
/// container's template options.
+ /// @return A returned<sender>
+ /// @copydetails returned
PN_CPP_EXTERN returned<sender> open_sender(const std::string &url,
const connection_options &c);
@@ -155,11 +166,17 @@ class PN_CPP_CLASS_EXTERN container {
///
/// Supplied sender or connection options will override the
/// container's template options.
+ ///
+ /// @return A returned<sender>
+ /// @copydetails returned
PN_CPP_EXTERN returned<sender> open_sender(const std::string &url,
const proton::sender_options &o,
const connection_options &c);
/// Open a connection and receiver for `url`.
+ ///
+ /// @return A returned<receiver>
+ /// @copydetails returned
PN_CPP_EXTERN returned<receiver> open_receiver(const std::string&url);
@@ -167,6 +184,9 @@ class PN_CPP_CLASS_EXTERN container {
///
/// Supplied receiver options will override the container's
/// template options.
+ ///
+ /// @return A returned<receiver>
+ /// @copydetails returned
PN_CPP_EXTERN returned<receiver> open_receiver(const std::string&url,
const proton::receiver_options &o);
@@ -174,6 +194,9 @@ class PN_CPP_CLASS_EXTERN container {
///
/// Supplied receiver or connection options will override the
/// container's template options.
+ ///
+ /// @return A returned<receiver>
+ /// @copydetails returned
PN_CPP_EXTERN returned<receiver> open_receiver(const std::string&url,
const connection_options &c);
@@ -181,6 +204,9 @@ class PN_CPP_CLASS_EXTERN container {
///
/// Supplied receiver or connection options will override the
/// container's template options.
+ ///
+ /// @return A returned<receiver>
+ /// @copydetails returned
PN_CPP_EXTERN returned<receiver> open_receiver(const std::string&url,
const proton::receiver_options &o,
const connection_options &c);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/delivery.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/delivery.hpp b/proton-c/bindings/cpp/include/proton/delivery.hpp
index 7c89f0c..7a38bca 100644
--- a/proton-c/bindings/cpp/include/proton/delivery.hpp
+++ b/proton-c/bindings/cpp/include/proton/delivery.hpp
@@ -46,7 +46,7 @@ class delivery : public transfer {
// XXX ATM the following don't reflect the differing behaviors we
// get from the different delivery modes. - Deferred
-
+
/// Settle with ACCEPTED state.
PN_CPP_EXTERN void accept();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/fwd.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/fwd.hpp b/proton-c/bindings/cpp/include/proton/fwd.hpp
index 5ade5fd..efbb91b 100644
--- a/proton-c/bindings/cpp/include/proton/fwd.hpp
+++ b/proton-c/bindings/cpp/include/proton/fwd.hpp
@@ -64,8 +64,6 @@ class connection_driver;
}
template <class T> class returned;
-template <class T> class thread_safe;
-
}
#endif // PROTON_FWD_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/internal/object.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/object.hpp b/proton-c/bindings/cpp/include/proton/internal/object.hpp
index d492b80..442b09d 100644
--- a/proton-c/bindings/cpp/include/proton/internal/object.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/object.hpp
@@ -31,7 +31,7 @@
namespace proton {
-template <class T> class thread_safe;
+template <class T> class returned;
namespace internal {
@@ -101,7 +101,8 @@ template <class T> class object : private comparable<object<T> > {
friend bool operator==(const object& a, const object& b) { return a.object_ == b.object_; }
friend bool operator<(const object& a, const object& b) { return a.object_ < b.object_; }
friend std::ostream& operator<<(std::ostream& o, const object& a) { o << a.object_.inspect(); return o; }
- template <class U> friend class proton::thread_safe;
+
+ template <class U> friend class proton::returned;
};
/// Factory class used internally to make wrappers and extract proton objects
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
index 44275bc..f9774ac 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -179,7 +179,6 @@ PN_CPP_CLASS_EXTERN connection_driver {
PN_CPP_EXTERN bool dispatch();
/// Get the AMQP connection associated with this connection_driver.
- /// The event_loop is availabe via proton::thread_safe<connection>(connection())
PN_CPP_EXTERN proton::connection connection() const;
/// Get the transport associated with this connection_driver.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/receiver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/receiver.hpp b/proton-c/bindings/cpp/include/proton/receiver.hpp
index f92ac96..b995e6f 100644
--- a/proton-c/bindings/cpp/include/proton/receiver.hpp
+++ b/proton-c/bindings/cpp/include/proton/receiver.hpp
@@ -76,12 +76,11 @@ PN_CPP_CLASS_EXTERN receiver : public link {
/// @cond INTERNAL
friend class internal::factory<receiver>;
friend class receiver_iterator;
- friend class thread_safe<receiver>;
/// @endcond
};
/// @cond INTERNAL
-
+
/// An iterator of receivers.
class receiver_iterator : public internal::iter_base<receiver, receiver_iterator> {
explicit receiver_iterator(receiver r, pn_session_t* s = 0) :
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/returned.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/returned.hpp b/proton-c/bindings/cpp/include/proton/returned.hpp
new file mode 100644
index 0000000..25b5c91
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/returned.hpp
@@ -0,0 +1,62 @@
+#ifndef PROTON_RETURNED_HPP
+#define PROTON_RETURNED_HPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "./internal/object.hpp"
+#include "./connection.hpp"
+#include "./receiver.hpp"
+#include "./sender.hpp"
+
+/// @file
+/// Return type for container functions
+
+namespace proton {
+
+namespace internal {
+template <class T> class factory;
+}
+
+/// Return type for container functions
+///
+/// @note returned value is *thread-unsafe*.
+/// A single-threaded application can assign the returned<T> value to a plain T.
+/// A multi-threaded application *must* ignore the returned value, as it may already
+/// be invalid by the time the function returns. Multi-threaded applications
+/// can access the value in @ref messaging_handler functions.
+///
+template <class T>
+class returned
+{
+ public:
+ operator T() const;
+
+ private:
+ typename T::pn_type* ptr_;
+ returned(const T&);
+ returned& operator=(const returned&);
+ template <class U> friend class internal::factory;
+};
+
+} // proton
+
+#endif /*!PROTON_RETURNED_HPP*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/sender.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp
index f8c1e66..b01f21c 100644
--- a/proton-c/bindings/cpp/include/proton/sender.hpp
+++ b/proton-c/bindings/cpp/include/proton/sender.hpp
@@ -71,7 +71,6 @@ PN_CPP_CLASS_EXTERN sender : public link {
/// @cond INTERNAL
friend class internal::factory<sender>;
friend class sender_iterator;
- friend class thread_safe<sender>;
/// @endcond
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/session.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/session.hpp b/proton-c/bindings/cpp/include/proton/session.hpp
index 8d4184b..c66aa05 100644
--- a/proton-c/bindings/cpp/include/proton/session.hpp
+++ b/proton-c/bindings/cpp/include/proton/session.hpp
@@ -99,7 +99,6 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp
/// @cond INTERNAL
friend class internal::factory<session>;
friend class session_iterator;
- friend class thread_safe<session>;
/// @endcond
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/thread_safe.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/thread_safe.hpp b/proton-c/bindings/cpp/include/proton/thread_safe.hpp
deleted file mode 100644
index 0b38883..0000000
--- a/proton-c/bindings/cpp/include/proton/thread_safe.hpp
+++ /dev/null
@@ -1,165 +0,0 @@
-#ifndef PROTON_THREAD_SAFE_HPP
-#define PROTON_THREAD_SAFE_HPP
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "./fwd.hpp"
-#include "./internal/config.hpp"
-#include "./connection.hpp"
-#include "./function.hpp"
-#include "./internal/object.hpp"
-#include "./internal/type_traits.hpp"
-#include "./work_queue.hpp"
-
-#include <functional>
-
-namespace proton {
-
-namespace internal {
-template <class T> struct endpoint_traits;
-template<> struct endpoint_traits<connection> {};
-template<> struct endpoint_traits<session> {};
-template<> struct endpoint_traits<link> {};
-template<> struct endpoint_traits<sender> {};
-template<> struct endpoint_traits<receiver> {};
-}
-
-/// **Experimental** - A thread-safe object wrapper.
-///
-/// The proton::object subclasses (proton::connection, proton::sender etc.) are
-/// reference-counted wrappers for C structs. They are not safe for concurrent use,
-/// not even to copy or assign.
-///
-/// A pointer to thread_safe<> can be used from any thread to get the
-/// proton::event_loop for the object's connection. The object will not be
-/// destroyed until the thread_safe<> is deleted. You can use std::shared_ptr,
-/// std::unique_ptr or any other memory management technique to manage the
-/// thread_safe<>.
-///
-/// Use make_thread_safe(), make_shared_thread_safe(), make_unique_thread_safe() to
-/// create a thread_safe<>
-///
-/// @see @ref mt_page
-template <class T>
-class thread_safe : private internal::pn_ptr_base, private internal::endpoint_traits<T> {
- typedef typename T::pn_type pn_type;
-
- public:
- /// @cond INTERNAL
- static void operator delete(void*) {}
- /// @endcond
-
- ~thread_safe() {
- if (ptr()) {
- if (!!work_queue().impl_) schedule_work(&work_queue(), &decref, (void*)ptr());
- else decref(ptr());
- }
- }
-
- /// Get the work queue for this object.
- class work_queue& work_queue() { return work_queue::get(ptr()); }
-
- /// Get the thread-unsafe proton object wrapped by this thread_safe<T>
- T unsafe() { return T(ptr()); }
-
- private:
- static thread_safe* create(const T& obj) { return new (obj.pn_object()) thread_safe(); }
- static void* operator new(size_t, pn_type* p) { return p; }
- static void operator delete(void*, pn_type*) {}
- thread_safe() { incref(ptr()); }
- pn_type* ptr() { return reinterpret_cast<pn_type*>(this); }
-
-
- // Non-copyable.
- thread_safe(const thread_safe&);
- thread_safe& operator=(const thread_safe&);
-
- /// @cond INTERNAL
- friend class returned<T>;
- /// @endcond
-};
-
-// A return value for functions returning a thread_safe<> object.
-//
-// Temporary return value only, you should release() to get a plain pointer or
-// assign to a smart pointer type.
-template <class T>
-class returned : private internal::endpoint_traits<T>
-{
- public:
- /// Take ownership
- explicit returned(thread_safe<T>* p) : ptr_(p) {}
- /// Create an owned thread_safe<T>
- explicit returned(const T& obj) : ptr_(thread_safe<T>::create(obj)) {}
- /// Transfer ownership.
- /// Use the same "cheat" as std::auto_ptr, calls x.release() even though x is const.
- returned(const returned& x) : ptr_(const_cast<returned&>(x).release()) {}
- /// Delete if still owned.
- ~returned() { if (ptr_) delete ptr_; }
-
- /// Release ownership.
- thread_safe<T>* release() const { thread_safe<T>* p = ptr_; ptr_ = 0; return p; }
-
- /// Get the raw pointer, caller must not delete.
- thread_safe<T>* get() const { return ptr_; }
-
- /// Implicit conversion to target, usable only in a safe context.
- operator T() { return ptr_->unsafe(); }
-
-#if PN_CPP_HAS_SHARED_PTR
- /// Release to a std::shared_ptr
- operator std::shared_ptr<thread_safe<T> >() {
- return std::shared_ptr<thread_safe<T> >(release());
- }
-#endif
-#if PN_CPP_HAS_UNIQUE_PTR
- /// Release to a std::unique_ptr
- operator std::unique_ptr<thread_safe<T> >() {
- return std::unique_ptr<thread_safe<T> >(release());
- }
-#endif
-
- private:
- void operator=(const returned&);
- mutable thread_safe<T>* ptr_;
-};
-
-/// Make a thread-safe wrapper for `obj`.
-template <class T> returned<T> make_thread_safe(const T& obj) { return returned<T>(obj); }
-
-#if PN_CPP_HAS_SHARED_PTR
-/// Create a thread-safe shared_ptr to `obj`.
-template <class T> std::shared_ptr<thread_safe<T> > make_shared_thread_safe(const T& obj) {
- return make_thread_safe(obj);
-}
-#endif
-#if PN_CPP_HAS_UNIQUE_PTR
-/// Create a thread-safe unique_ptr to `obj`.
-template <class T> std::unique_ptr<thread_safe<T> > make_unique_thread_safe(const T& obj) {
- return make_thread_safe(obj);
-}
-
-#endif
-
-} // proton
-
-#endif // PROTON_THREAD_SAFE_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/include/proton/work_queue.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp b/proton-c/bindings/cpp/include/proton/work_queue.hpp
index 844680b..30d8395 100644
--- a/proton-c/bindings/cpp/include/proton/work_queue.hpp
+++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp
@@ -40,10 +40,10 @@ namespace proton {
/// **Experimental** - A work queue for serial execution.
///
/// Event handler functions associated with a single proton::connection are called in sequence.
-/// The connection's @ref work_queue allows you to "inject" extra @ref work from any thread,
+/// The connection's proton::work_queue allows you to "inject" extra @ref work from any thread,
/// and have it executed in the same sequence.
///
-/// You may also create arbitrary @ref work_queue objects backed by a @ref container that allow
+/// You may also create arbitrary proton::work_queue objects backed by a @ref container that allow
/// other objects to have their own serialised work queues that can have work injected safely
/// from other threads. The @ref container ensures that the work is correctly serialised.
///
@@ -113,7 +113,6 @@ class PN_CPP_CLASS_EXTERN work_queue {
/// @cond INTERNAL
friend class container;
friend class io::connection_driver;
- template <class T> friend class thread_safe;
/// @endcond
};
@@ -347,6 +346,7 @@ void schedule_work(WQ wq, duration dn, F f, A a, B b, C c, D d) {
#else
// The C++11 version is *much* simpler and even so more general!
// These definitions encompass everything in the C++03 section
+
template <class WQ, class... Rest>
bool schedule_work(WQ wq, Rest&&... r) {
return wq->add(std::bind(std::forward<Rest>(r)...));
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/connection_driver_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_driver_test.cpp b/proton-c/bindings/cpp/src/connection_driver_test.cpp
index 7fcde46..d174454 100644
--- a/proton-c/bindings/cpp/src/connection_driver_test.cpp
+++ b/proton-c/bindings/cpp/src/connection_driver_test.cpp
@@ -22,6 +22,7 @@
#include "proton_bits.hpp"
#include "proton/container.hpp"
+#include "proton/connection.hpp"
#include "proton/io/connection_driver.hpp"
#include "proton/io/link_namer.hpp"
#include "proton/link.hpp"
@@ -31,7 +32,6 @@
#include "proton/sender.hpp"
#include "proton/sender_options.hpp"
#include "proton/source_options.hpp"
-#include "proton/thread_safe.hpp"
#include "proton/transport.hpp"
#include "proton/types_fwd.hpp"
#include "proton/uuid.hpp"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp
index 35f645c..c82e1a8 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -25,7 +25,6 @@
#include "proton/error_condition.hpp"
#include "proton/listen_handler.hpp"
#include "proton/listener.hpp"
-#include "proton/thread_safe.hpp"
#include "proactor_container_impl.hpp"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/container_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_test.cpp b/proton-c/bindings/cpp/src/container_test.cpp
index d210268..498b217 100644
--- a/proton-c/bindings/cpp/src/container_test.cpp
+++ b/proton-c/bindings/cpp/src/container_test.cpp
@@ -26,7 +26,6 @@
#include "proton/messaging_handler.hpp"
#include "proton/listener.hpp"
#include "proton/listen_handler.hpp"
-#include "proton/thread_safe.hpp"
#include <cstdlib>
#include <ctime>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/include/proton_bits.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proton_bits.hpp b/proton-c/bindings/cpp/src/include/proton_bits.hpp
index fdd79b5..035ffb7 100644
--- a/proton-c/bindings/cpp/src/include/proton_bits.hpp
+++ b/proton-c/bindings/cpp/src/include/proton_bits.hpp
@@ -124,6 +124,7 @@ class factory {
public:
static T wrap(typename wrapped<T>::type* t) { return t; }
static typename wrapped<T>::type* unwrap(const T& t) { return t.pn_object(); }
+ static returned<T> make_returned(const T& t) { return returned<T>(t); }
};
template <class T> struct context {};
@@ -150,6 +151,11 @@ U make_wrapper(typename internal::wrapped<U>::type* t) { return internal::factor
template <class T>
typename internal::wrapped<T>::type* unwrap(const T& t) { return internal::factory<T>::unwrap(t); }
+template <class T>
+returned<T> make_returned(const T& t) {
+ return internal::factory<T>::make_returned(t);
+}
+
}
#endif // PROTON_BITS_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index b900d6f..1389306 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -24,7 +24,6 @@
#include "proton/function.hpp"
#include "proton/listener.hpp"
#include "proton/listen_handler.hpp"
-#include "proton/thread_safe.hpp"
#include "proton/url.hpp"
#include "proton/connection.h"
@@ -188,13 +187,13 @@ proton::connection container::impl::connect_common(
return conn;
}
-proton::returned<proton::connection> container::impl::connect(
+returned<proton::connection> container::impl::connect(
const std::string& addr,
const proton::connection_options& user_opts)
{
connection conn = connect_common(addr, user_opts);
GUARD(lock_);
- return make_thread_safe(conn);
+ return make_returned(conn);
}
returned<sender> container::impl::open_sender(const std::string &url, const proton::sender_options &o1, const connection_options &o2) {
@@ -203,7 +202,7 @@ returned<sender> container::impl::open_sender(const std::string &url, const prot
connection conn = connect_common(url, o2);
GUARD(lock_);
- return make_thread_safe(conn.default_session().open_sender(proton::url(url).path(), lopts));
+ return make_returned(conn.default_session().open_sender(proton::url(url).path(), lopts));
}
returned<receiver> container::impl::open_receiver(const std::string &url, const proton::receiver_options &o1, const connection_options &o2) {
@@ -212,7 +211,7 @@ returned<receiver> container::impl::open_receiver(const std::string &url, const
connection conn = connect_common(url, o2);
GUARD(lock_);
- return make_thread_safe(
+ return make_returned(
conn.default_session().open_receiver(proton::url(url).path(), lopts));
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/returned.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/returned.cpp b/proton-c/bindings/cpp/src/returned.cpp
new file mode 100644
index 0000000..2e1a4b2
--- /dev/null
+++ b/proton-c/bindings/cpp/src/returned.cpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton_bits.hpp"
+
+#include <proton/returned.hpp>
+#include <proton/connection.hpp>
+#include <proton/sender.hpp>
+#include <proton/receiver.hpp>
+
+namespace proton {
+
+template <class T> returned<T>::returned(const T& t) : ptr_(unwrap(t)) {}
+
+template <class T> returned<T>::operator T() const {
+ return internal::factory<T>::wrap(ptr_);
+}
+
+// Explicit instantiations for allowed types
+
+template class PN_CPP_CLASS_EXTERN returned<connection>;
+template class PN_CPP_CLASS_EXTERN returned<sender>;
+template class PN_CPP_CLASS_EXTERN returned<receiver>;
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/proton-c/bindings/cpp/src/thread_safe_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/thread_safe_test.cpp b/proton-c/bindings/cpp/src/thread_safe_test.cpp
deleted file mode 100644
index 3a72f7f..0000000
--- a/proton-c/bindings/cpp/src/thread_safe_test.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/// Test reference counting for object wrappers, threads_safe<> wrappers and returned<> wrappers.
-///
-
-#include "test_bits.hpp"
-#include "proton_bits.hpp"
-
-#include "proton/thread_safe.hpp"
-#include "proton/io/connection_driver.hpp"
-
-#include <proton/connection.h>
-
-namespace {
-
-using namespace std;
-using namespace proton;
-
-void test_new() {
- pn_connection_t* c = 0;
- thread_safe<connection>* p = 0;
- {
- io::connection_driver e;
- c = unwrap(e.connection());
- int r = pn_refcount(c);
- ASSERT(r >= 1); // engine may have internal refs (transport, collector).
- p = make_thread_safe(e.connection()).release();
- ASSERT_EQUAL(r+1, pn_refcount(c));
- delete p;
- ASSERT_EQUAL(r, pn_refcount(c));
- p = make_thread_safe(e.connection()).release();
- }
- ASSERT_EQUAL(1, pn_refcount(c)); // Engine gone, thread_safe keeping c alive.
- delete p;
-
-#if PN_CPP_HAS_SHARED_PTR
- {
- std::shared_ptr<thread_safe<connection> > sp;
- {
- io::connection_driver e;
- c = unwrap(e.connection());
- sp = make_shared_thread_safe(e.connection());
- }
- ASSERT_EQUAL(1, pn_refcount(c)); // Engine gone, sp keeping c alive.
- }
-#endif
-#if PN_CPP_HAS_UNIQUE_PTR
- {
- std::unique_ptr<thread_safe<connection> > up;
- {
- io::connection_driver e;
- c = unwrap(e.connection());
- up = make_unique_thread_safe(e.connection());
- }
- ASSERT_EQUAL(1, pn_refcount(c)); // Engine gone, sp keeping c alive.
- }
-#endif
-}
-
-void test_convert() {
- // Verify refcounts as expected with conversion between proton::object
- // and thread_safe.
- connection c;
- pn_connection_t* pc = 0;
- {
- io::connection_driver eng;
- c = eng.connection();
- pc = unwrap(c); // Unwrap in separate scope to avoid confusion from temp values.
- }
- {
- ASSERT_EQUAL(1, pn_refcount(pc));
- returned<connection> pptr = make_thread_safe(c);
- ASSERT_EQUAL(2, pn_refcount(pc));
- returned<connection> pp2 = pptr;
- ASSERT(!pptr.release()); // Transferred to pp2
- ASSERT_EQUAL(2, pn_refcount(pc));
- connection c2 = pp2; // Transfer and convert to target
- ASSERT_EQUAL(3, pn_refcount(pc)); // c, c2, thread_safe.
- ASSERT(c == c2);
- }
- ASSERT_EQUAL(1, pn_refcount(pc)); // only c is left
-}
-
-}
-
-int main(int, char**) {
- int failed = 0;
- RUN_TEST(failed, test_new());
- RUN_TEST(failed, test_convert());
- return failed;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f1ee2681/tests/tools/apps/cpp/reactor_send.cpp
----------------------------------------------------------------------
diff --git a/tests/tools/apps/cpp/reactor_send.cpp b/tests/tools/apps/cpp/reactor_send.cpp
index 7841a5e..62ac4ce 100644
--- a/tests/tools/apps/cpp/reactor_send.cpp
+++ b/tests/tools/apps/cpp/reactor_send.cpp
@@ -32,7 +32,6 @@
#include "proton/messaging_handler.hpp"
#include "proton/receiver_options.hpp"
#include "proton/sender.hpp"
-#include "proton/thread_safe.hpp"
#include "proton/tracker.hpp"
#include "proton/value.hpp"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-proton git commit: PROTON-1557: c++ improve multi-threaded
clients
Posted by ac...@apache.org.
PROTON-1557: c++ improve multi-threaded clients
2 clients:
- multithreaded_client.cpp: simple send thread, receive thread, run thread
- multithreaded_client_flow_control: multi-connection, block for flow control
Changes:
- reduced needless diff between examples
- use separate work_queue* to clarify separate thread safety rules from sender
- took work_queue->add() out of lock to emphasize it is thread safe
- use fixed argument list, same arg order
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/298e7dba
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/298e7dba
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/298e7dba
Branch: refs/heads/master
Commit: 298e7dba1b8c09781abca0a4a555b90116703a8a
Parents: 68c8cf4
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Aug 28 11:53:35 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Aug 28 17:54:35 2017 -0400
----------------------------------------------------------------------
examples/cpp/CMakeLists.txt | 3 +-
examples/cpp/example_test.py | 12 +
examples/cpp/mt_queue.hpp | 102 -------
examples/cpp/multithreaded_client.cpp | 185 ++++++++++++
.../cpp/multithreaded_client_flow_control.cpp | 287 +++++++++++++++++++
examples/cpp/send_recv_mt.cpp | 269 -----------------
6 files changed, 486 insertions(+), 372 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index df9f6a7..d116913 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -63,7 +63,8 @@ if(HAS_CPP11)
# Examples that require C++11
foreach(example
scheduled_send
- send_recv_mt
+ multithreaded_client
+ multithreaded_client_flow_control
)
add_executable(${example} ${example}.cpp)
endforeach()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 98f1d90..8f9f64c 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -300,5 +300,17 @@ Hello World!
expect_found = (out.find(expect) >= 0)
self.assertEqual(expect_found, True)
+ def test_multithreaded_client(self):
+ with TestPort() as tp:
+ want = ""
+ got = self.proc(["multithreaded_client", 10, 2, tp.addr]).wait_exit()
+ self.assertMultiLineEqual(want, got)
+
+ def test_multithreaded_client_flow_control(self):
+ with TestPort() as tp:
+ want = ""
+ got = self.proc(["multithreaded_client_flow_control", "10", "2", tp.addr]).wait_exit()
+ self.assertMultiLineEqual(want, got)
+
if __name__ == "__main__":
unittest.main()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/mt_queue.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt_queue.hpp b/examples/cpp/mt_queue.hpp
deleted file mode 100644
index f053ebe..0000000
--- a/examples/cpp/mt_queue.hpp
+++ /dev/null
@@ -1,102 +0,0 @@
-#ifndef MT_QUEUE_HPP
-#define MT_QUEUE_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <condition_variable>
-#include <stdexcept>
-#include <mutex>
-#include <queue>
-
-class closed_error : public std::runtime_error {
- public:
- closed_error() : std::runtime_error("closed") {}
-};
-
-// A bounded, thread-safe queue.
-// Objects are moved on and off the queue, not copied. Avoids overhead of copy operations.
-template <class T, size_t CAPACITY> class mt_queue {
- std::queue<T> q_;
- std::mutex lock_;
- std::condition_variable push_;
- std::condition_variable pop_;
- bool closed_;
-
- void do_push(T&& x) {
- q_.push(std::move(x));
- pop_.notify_one();
- }
-
- T do_pop() {
- T x(std::move(q_.front()));
- q_.pop();
- push_.notify_one();
- return x;
- }
-
- bool can_push() { return q_.size() < CAPACITY; }
- bool can_pop() { return q_.size() > 0; }
-
- public:
-
- mt_queue() : closed_(false) {}
-
- void push(T&& x) {
- std::unique_lock<std::mutex> l(lock_);
- while(!can_push())
- push_.wait(l);
- do_push(std::move(x));
- }
-
- T pop() {
- std::unique_lock<std::mutex> l(lock_);
- while(!can_pop())
- pop_.wait(l);
- return do_pop();
- }
-
- bool try_push(T&& x) noexcept {
- std::lock_guard<std::mutex> l(lock_);
- bool ok = can_push();
- if (ok)
- do_push(std::move(x));
- return ok;
- }
-
- bool try_pop(T& x) noexcept {
- std::lock_guard<std::mutex> l(lock_);
- bool ok = can_pop();
- if (ok)
- x = std::move(do_pop());
- return ok;
- }
-
- size_t capacity() noexcept {
- return CAPACITY;
- }
-
- size_t size() noexcept {
- std::lock_guard<std::mutex> l(lock_);
- return q_.size();
- }
-};
-
-
-#endif // MT_QUEUE_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/multithreaded_client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client.cpp b/examples/cpp/multithreaded_client.cpp
new file mode 100644
index 0000000..955655c
--- /dev/null
+++ b/examples/cpp/multithreaded_client.cpp
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//
+// C++11 only
+//
+// A multi-threaded client that calls proton::container::run() in one thread, sends
+// messages in another and receives messages in a third.
+//
+// Note this client does not deal with flow-control. If the sender is faster
+// than the receiver, messages will build up in memory on the sending side.
+// See @ref multithreaded_client_flow_control.cpp for a more complex example with
+// flow control.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// Handler for a single thread-safe sending and receiving connection.
+class client : public proton::messaging_handler {
+ // Invariant
+ const std::string url_;
+ const std::string address_;
+
+ // Only used in proton handler thread
+ proton::sender sender_;
+
+ // Shared by proton and user threads, protected by lock_
+ std::mutex lock_;
+ proton::work_queue *work_queue_;
+ std::condition_variable sender_ready_;
+ std::queue<proton::message> messages_;
+ std::condition_variable messages_ready_;
+
+ public:
+ client(const std::string& url, const std::string& address) : url_(url), address_(address) {}
+
+ // Thread safe
+ void send(const proton::message& msg) {
+ // Use [=] to copy the message, we cannot pass it by reference since it
+ // will be used in another thread.
+ work_queue()->add([=]() { sender_.send(msg); });
+ }
+
+ // Thread safe
+ proton::message receive() {
+ std::unique_lock<std::mutex> l(lock_);
+ while (messages_.empty()) messages_ready_.wait(l);
+ auto msg = std::move(messages_.front());
+ messages_.pop();
+ return msg;
+ }
+
+ // Thread safe
+ void close() {
+ work_queue()->add([=]() { sender_.connection().close(); });
+ }
+
+ private:
+
+ proton::work_queue* work_queue() {
+ // Wait till work_queue_ and sender_ are initialized.
+ std::unique_lock<std::mutex> l(lock_);
+ while (!work_queue_) sender_ready_.wait(l);
+ return work_queue_;
+ }
+
+ // == messaging_handler overrides, only called in proton hander thread
+
+ // Note: this example creates a connection when the container starts.
+ // To create connections after the container has started, use
+ // container::connect().
+ // See @ref multithreaded_client_flow_control.cpp for an example.
+ void on_container_start(proton::container& cont) override {
+ cont.connect(url_);
+ }
+
+ void on_connection_open(proton::connection& conn) override {
+ conn.open_sender(address_);
+ conn.open_receiver(address_);
+ }
+
+ void on_sender_open(proton::sender& s) override {
+ {
+ // sender_ and work_queue_ must be set atomically
+ std::lock_guard<std::mutex> l(lock_);
+ sender_ = s;
+ work_queue_ = &s.work_queue();
+ }
+ sender_ready_.notify_all();
+ }
+
+ void on_message(proton::delivery& dlv, proton::message& msg) override {
+ {
+ std::lock_guard<std::mutex> l(lock_);
+ messages_.push(msg);
+ }
+ messages_ready_.notify_all();
+ }
+
+ void on_error(const proton::error_condition& e) override {
+ std::cerr << "unexpected error: " << e << std::endl;
+ exit(1);
+ }
+};
+
+int main(int argc, const char** argv) {
+ try {
+ if (argc != 4) {
+ std ::cerr <<
+ "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
+ "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+ "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+ "MESSAGE-COUNT: number of messages to send\n";
+ return 1;
+ }
+ const char *url = argv[1];
+ const char *address = argv[2];
+ int n_messages = atoi(argv[3]);
+
+ client cl(url, address);
+ proton::container container(cl);
+ std::thread container_thread([&]() { container.run(); });
+
+ std::thread sender([&]() {
+ for (int i = 0; i < n_messages; ++i) {
+ proton::message msg(std::to_string(i + 1));
+ cl.send(msg);
+ std::cout << "sent: " << msg.body() << std::endl;
+ }
+ });
+
+ int received = 0;
+ std::thread receiver([&]() {
+ for (int i = 0; i < n_messages; ++i) {
+ auto msg = cl.receive();
+ std::cout << "received: " << msg.body() << std::endl;
+ ++received;
+ }
+ });
+
+ sender.join();
+ receiver.join();
+ cl.close();
+ container_thread.join();
+ std::cout << "received " << received << " messages" << std::endl;
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/multithreaded_client_flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client_flow_control.cpp b/examples/cpp/multithreaded_client_flow_control.cpp
new file mode 100644
index 0000000..9eec782
--- /dev/null
+++ b/examples/cpp/multithreaded_client_flow_control.cpp
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// C++11 only
+//
+// A multi-threaded client that sends and receives messages from multiple AMQP
+// addresses.
+//
+// Demonstrates how to:
+//
+// - implement proton handlers that interact with user threads safely
+// - block sender threads to respect AMQP flow control
+// - use AMQP flow control to limit message buffering for receivers threads
+//
+// We define sender and receiver classes with simple, thread-safe blocking
+// send() and receive() functions.
+//
+// These classes are also privately proton::message_handler instances. They use
+// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
+// etc.) to pass messages between user and proton::container threads.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// A thread-safe sending connection that blocks sending threads when there
+// is no AMQP credit to send messages.
+class sender : private proton::messaging_handler {
+ // Only used in proton handler thread
+ proton::sender sender_;
+
+ // Shared by proton and user threads, protected by lock_
+ std::mutex lock_;
+ proton::work_queue *work_queue_;
+ std::condition_variable sender_ready_;
+ int queued_; // Queued messages waiting to be sent
+ int credit_; // AMQP credit - number of messages we can send
+
+ public:
+ sender(proton::container& cont, const std::string& url, const std::string& address)
+ : work_queue_(0), queued_(0), credit_(0)
+ {
+ cont.open_sender(url+"/"+address, proton::connection_options().handler(*this));
+ }
+
+ // Thread safe
+ void send(const proton::message& m) {
+ {
+ std::unique_lock<std::mutex> l(lock_);
+ // Don't queue up more messages than we have credit for
+ while (!work_queue_ || queued_ >= credit_) sender_ready_.wait(l);
+ ++queued_;
+ }
+ work_queue_->add([=]() { this->do_send(m); }); // work_queue_ is thread safe
+ }
+
+ // Thread safe
+ void close() {
+ work_queue()->add([=]() { sender_.connection().close(); });
+ }
+
+ private:
+
+ proton::work_queue* work_queue() {
+ // Wait till work_queue_ and sender_ are initialized.
+ std::unique_lock<std::mutex> l(lock_);
+ while (!work_queue_) sender_ready_.wait(l);
+ return work_queue_;
+ }
+
+ // == messaging_handler overrides, only called in proton hander thread
+
+ void on_sender_open(proton::sender& s) override {
+ // Make sure sender_ and work_queue_ are set atomically
+ std::lock_guard<std::mutex> l(lock_);
+ sender_ = s;
+ work_queue_ = &s.work_queue();
+ }
+
+ void on_sendable(proton::sender& s) override {
+ std::lock_guard<std::mutex> l(lock_);
+ credit_ = s.credit();
+ sender_ready_.notify_all(); // Notify senders we have credit
+ }
+
+ // work_queue work items is are automatically dequeued and called by proton
+ // This function is called because it was queued by send()
+ void do_send(const proton::message& m) {
+ sender_.send(m);
+ std::lock_guard<std::mutex> l(lock_);
+ --queued_; // work item was consumed from the work_queue
+ credit_ = sender_.credit(); // update credit
+ sender_ready_.notify_all(); // Notify senders we have space on queue
+ }
+
+ void on_error(const proton::error_condition& e) override {
+ std::cerr << "unexpected error: " << e << std::endl;
+ exit(1);
+ }
+};
+
+// A thread safe receiving connection that blocks receiving threads when there
+// are no messages available, and maintains a bounded buffer of incoming
+// messages by issuing AMQP credit only when there is space in the buffer.
+class receiver : private proton::messaging_handler {
+ static const size_t MAX_BUFFER = 100; // Max number of buffered messages
+
+ // Used in proton threads only
+ proton::receiver receiver_;
+
+ // Used in proton and user threads, protected by lock_
+ std::mutex lock_;
+ proton::work_queue* work_queue_;
+ std::queue<proton::message> buffer_; // Messages not yet returned by receive()
+ std::condition_variable can_receive_; // Notify receivers of messages
+
+ public:
+
+ // Connect to url
+ receiver(proton::container& cont, const std::string& url, const std::string& address)
+ : work_queue_()
+ {
+ // NOTE:credit_window(0) disables automatic flow control.
+ // We will use flow control to match AMQP credit to buffer capacity.
+ cont.open_receiver(url+"/"+address, proton::receiver_options().credit_window(0),
+ proton::connection_options().handler(*this));
+ }
+
+ // Thread safe receive
+ proton::message receive() {
+ std::unique_lock<std::mutex> l(lock_);
+ // Wait for buffered messages
+ while (!work_queue_ || buffer_.empty())
+ can_receive_.wait(l);
+ proton::message m = std::move(buffer_.front());
+ buffer_.pop();
+ // Add a lambda to the work queue to call receive_done().
+ // This will tell the handler to add more credit.
+ work_queue_->add([=]() { this->receive_done(); });
+ return m;
+ }
+
+ void close() {
+ std::lock_guard<std::mutex> l(lock_);
+ if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close(); });
+ }
+
+ private:
+ // ==== The following are called by proton threads only.
+
+ void on_receiver_open(proton::receiver& r) override {
+ receiver_ = r;
+ std::lock_guard<std::mutex> l(lock_);
+ work_queue_ = &receiver_.work_queue();
+ receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
+ }
+
+ void on_message(proton::delivery &d, proton::message &m) override {
+ // Proton automatically reduces credit by 1 before calling on_message
+ std::lock_guard<std::mutex> l(lock_);
+ buffer_.push(m);
+ can_receive_.notify_all();
+ }
+
+ // called via work_queue
+ void receive_done() {
+ // Add 1 credit, a receiver has taken a message out of the buffer.
+ receiver_.add_credit(1);
+ }
+
+ void on_error(const proton::error_condition& e) override {
+ std::cerr << "unexpected error: " << e << std::endl;
+ exit(1);
+ }
+};
+
+// ==== Example code using the sender and receiver
+
+// Send n messages
+void send_thread(sender& s, int n, bool print) {
+ auto id = std::this_thread::get_id();
+ for (int i = 0; i < n; ++i) {
+ std::ostringstream ss;
+ ss << std::this_thread::get_id() << ":" << i;
+ s.send(proton::message(ss.str()));
+ if (print) std::cout << "received: " << ss.str() << std::endl;
+ }
+ std::cout << id << " sent " << n << std::endl;
+}
+
+// Receive messages till atomic remaining count is 0.
+// remaining is shared among all receiving threads
+void receive_thread(receiver& r, std::atomic_int& remaining, bool print) {
+ auto id = std::this_thread::get_id();
+ int n = 0;
+ while (remaining-- > 0) {
+ auto m = r.receive();
+ ++n;
+ if (print) std::cout << id << "received: " << m.body() << std::endl;
+ }
+ std::cout << id << " received " << n << " messages" << std::endl;
+}
+
+int main(int argc, const char **argv) {
+ try {
+ if (argc != 5) {
+ std::cerr <<
+ "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n"
+ "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+ "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+ "MESSAGE-COUNT: number of messages to send\n"
+ "THREAD-COUNT: number of sender/receiver thread pairs\n";
+ return 1;
+ }
+
+ const char *url = argv[1];
+ const char *address = argv[2];
+ int n_messages = atoi(argv[3]);
+ int n_threads = atoi(argv[4]);
+
+ // Total messages to be received, multiple receiver threads will decrement this.
+ std::atomic_int remaining(n_messages * n_threads);
+ bool print = remaining < 1000; // Don't print for long runs, dominates run time
+
+ // Run the proton container
+ proton::container container;
+ auto container_thread = std::thread([&]() { container.run(); });
+
+ // A single sender and receiver to be shared by all the threads
+ sender send(container, url, address);
+ receiver recv(container, url, address);
+
+ // Start receiver threads, then sender threads.
+ // Starting receivers first gives all receivers a chance to compete for messages.
+ std::vector<std::thread> threads;
+ for (int i = 0; i < n_threads; ++i)
+ threads.push_back(std::thread([&]() { receive_thread(recv, remaining, print); }));
+ for (int i = 0; i < n_threads; ++i)
+ threads.push_back(std::thread([&]() { send_thread(send, n_messages, print); }));
+
+ // Wait for threads to finish
+ for (auto& t : threads)
+ t.join();
+ send.close();
+ recv.close();
+
+ container_thread.join();
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/298e7dba/examples/cpp/send_recv_mt.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/send_recv_mt.cpp b/examples/cpp/send_recv_mt.cpp
deleted file mode 100644
index addcbaf..0000000
--- a/examples/cpp/send_recv_mt.cpp
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// C++11 only
-//
-// A multi-threaded client that sends and receives messages from multiple AMQP
-// addresses.
-//
-// Demonstrates how to:
-//
-// - implement proton handlers that interact with user threads safely
-// - block user threads calling send() to respect AMQP flow control
-// - use AMQP flow control to limit message buffering for receivers
-//
-// We define mt_sender and mt_receiver classes with simple, thread-safe blocking
-// send() and receive() functions.
-//
-// These classes are also privately proton::message_handler instances. They use
-// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
-// etc.) to pass messages between user and proton::container threads.
-//
-// NOTE: no proper error handling
-
-#include <proton/connection.hpp>
-#include <proton/connection_options.hpp>
-#include <proton/container.hpp>
-#include <proton/message.hpp>
-#include <proton/messaging_handler.hpp>
-#include <proton/receiver_options.hpp>
-#include <proton/sender.hpp>
-#include <proton/work_queue.hpp>
-
-#include <atomic>
-#include <condition_variable>
-#include <iostream>
-#include <mutex>
-#include <queue>
-#include <sstream>
-#include <thread>
-
-// Lock to serialize std::cout, std::cerr used from multiple threads.
-std::mutex out_lock;
-#define LOCK(EXPR) do { std::lock_guard<std::mutex> l(out_lock); EXPR; } while(0)
-#define COUT(EXPR) do { LOCK(std::cout << EXPR); } while(0)
-#define CERR(EXPR) do { LOCK(std::cerr << EXPR); } while(0)
-
-// A thread-safe sending connection.
-class mt_sender : private proton::messaging_handler {
- // Only used in proton thread
- proton::sender sender_;
-
- // Shared by proton and user threads, use lock_ to protect.
- std::mutex lock_;
- proton::work_queue* work_queue_; // Messages waiting to be sent
- std::condition_variable can_send_; // Signal sending threads
- int queued_; // Queued messages waiting to be sent
- int credit_; // AMQP credit - number of messages we can send
-
- public:
- // Connect to url
- mt_sender(proton::container& cont, const std::string& url) :
- work_queue_(0), queued_(0), credit_(0)
- {
- // Pass *this as handler.
- cont.open_sender(url, proton::connection_options().handler(*this));
- }
-
- // Thread safe send()
- void send(const proton::message& m) {
- std::unique_lock<std::mutex> l(lock_);
- // Don't queue up more messages than we have credit for
- while (!(work_queue_ && queued_ < credit_))
- can_send_.wait(l);
- ++queued_;
- // Add a lambda function to the work queue.
- // This will call do_send() with a copy of m in the correct proton thread.
- work_queue_->add([=]() { this->do_send(m); });
- }
-
- void close() {
- std::lock_guard<std::mutex> l(lock_);
- if (work_queue_)
- work_queue_->add([this]() { this->sender_.connection().close(); });
- }
-
- private:
- // ==== called by proton threads only
-
- void on_sender_open(proton::sender& s) override {
- sender_ = s;
- std::lock_guard<std::mutex> l(lock_);
- work_queue_ = &s.work_queue();
- }
-
- void on_sendable(proton::sender& s) override {
- std::lock_guard<std::mutex> l(lock_);
- credit_ = s.credit();
- can_send_.notify_all(); // Notify senders we have credit
- }
-
- // work_queue work items is are automatically dequeued and called by proton
- // This function is called because it was queued by send()
- void do_send(const proton::message& m) {
- sender_.send(m);
- std::lock_guard<std::mutex> l(lock_);
- --queued_; // work item was consumed from the work_queue
- credit_ = sender_.credit(); // update credit
- can_send_.notify_all(); // Notify senders we have space on queue
- }
-
- void on_error(const proton::error_condition& e) override {
- CERR("unexpected error: " << e << std::endl);
- exit(1);
- }
-};
-
-// A thread safe receiving connection.
-class mt_receiver : private proton::messaging_handler {
- static const size_t MAX_BUFFER = 100; // Max number of buffered messages
-
- // Used in proton threads only
- proton::receiver receiver_;
-
- // Used in proton and user threads, protected by lock_
- std::mutex lock_;
- proton::work_queue* work_queue_;
- std::queue<proton::message> buffer_; // Messages not yet returned by receive()
- std::condition_variable can_receive_; // Notify receivers of messages
-
- public:
-
- // Connect to url
- mt_receiver(proton::container& cont, const std::string& url) : work_queue_()
- {
- // NOTE:credit_window(0) disables automatic flow control.
- // We will use flow control to match AMQP credit to buffer capacity.
- cont.open_receiver(url, proton::receiver_options().credit_window(0),
- proton::connection_options().handler(*this));
- }
-
- // Thread safe receive
- proton::message receive() {
- std::unique_lock<std::mutex> l(lock_);
- // Wait for buffered messages
- while (!work_queue_ || buffer_.empty())
- can_receive_.wait(l);
- proton::message m = std::move(buffer_.front());
- buffer_.pop();
- // Add a lambda to the work queue to call receive_done().
- // This will tell the handler to add more credit.
- work_queue_->add([=]() { this->receive_done(); });
- return m;
- }
-
- void close() {
- std::lock_guard<std::mutex> l(lock_);
- if (work_queue_)
- work_queue_->add([this]() { this->receiver_.connection().close(); });
- }
-
- private:
- // ==== The following are called by proton threads only.
-
- void on_receiver_open(proton::receiver& r) override {
- receiver_ = r;
- std::lock_guard<std::mutex> l(lock_);
- work_queue_ = &receiver_.work_queue();
- receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
- }
-
- void on_message(proton::delivery &d, proton::message &m) override {
- // Proton automatically reduces credit by 1 before calling on_message
- std::lock_guard<std::mutex> l(lock_);
- buffer_.push(m);
- can_receive_.notify_all();
- }
-
- // called via work_queue
- void receive_done() {
- // Add 1 credit, a receiver has taken a message out of the buffer.
- receiver_.add_credit(1);
- }
-
- void on_error(const proton::error_condition& e) override {
- CERR("unexpected error: " << e << std::endl);
- exit(1);
- }
-};
-
-// ==== Example code using the mt_sender and mt_receiver
-
-// Send n messages
-void send_thread(mt_sender& s, int n) {
- for (int i = 0; i < n; ++i) {
- std::ostringstream o;
- o << std::this_thread::get_id() << ":" << i;
- s.send(proton::message(o.str()));
- }
- COUT(std::this_thread::get_id() << " sent " << n << std::endl);
-}
-
-// Receive messages till atomic remaining count is 0.
-// remaining is shared among all receiving threads
-void receive_thread(mt_receiver& r, std::atomic_int& remaining, bool print) {
- auto id = std::this_thread::get_id();
- int n = 0;
- while (remaining-- > 0) {
- auto m = r.receive();
- ++n;
- if (print)
- COUT(id << " received \"" << m.body() << '"' << std::endl);
- }
- COUT(id << " received " << n << " messages" << std::endl);
-}
-
-int main(int argc, const char **argv) {
- try {
- int n_threads = argc > 1 ? atoi(argv[1]) : 2;
- int n_messages = argc > 2 ? atoi(argv[2]) : 10;
- const char *url = argc > 3 ? argv[3] : "amqp://127.0.0.1/examples";
- std::atomic_int remaining(n_messages * n_threads); // Total messages to be received
- bool print = (remaining <= 30); // Print messages for short runs only
-
- // Run the proton container
- proton::container container;
- auto container_thread = std::thread([&]() { container.run(); });
-
- // A single sender and receiver to be shared by all the threads
- mt_sender sender(container, url);
- mt_receiver receiver(container, url);
-
- // Start receiver threads, then sender threads.
- // Starting receivers first gives all receivers a chance to compete for messages.
- std::vector<std::thread> threads;
- for (int i = 0; i < n_threads; ++i)
- threads.push_back(std::thread([&]() { receive_thread(receiver, remaining, print); }));
- for (int i = 0; i < n_threads; ++i)
- threads.push_back(std::thread([&]() { send_thread(sender, n_messages); }));
-
- // Wait for threads to finish
- for (auto& n_messages_threads : threads)
- n_messages_threads.join();
- sender.close();
- receiver.close();
-
- container_thread.join();
-
- return 0;
- } catch (const std::exception& e) {
- std::cerr << e.what() << std::endl;
- }
- return 1;
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-proton git commit: PROTON-1553: c++ wake() events
Posted by ac...@apache.org.
PROTON-1553: c++ wake() events
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/68c8cf43
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/68c8cf43
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/68c8cf43
Branch: refs/heads/master
Commit: 68c8cf43e51bbe71d7042688fbcda52de1ed5cce
Parents: f1ee268
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Aug 24 15:34:35 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Sat Aug 26 13:20:59 2017 -0400
----------------------------------------------------------------------
examples/cpp/mt_queue.hpp | 102 +++++++++++++++++++
.../bindings/cpp/include/proton/connection.hpp | 20 ++++
.../cpp/include/proton/internal/object.hpp | 4 +-
.../cpp/include/proton/messaging_handler.hpp | 15 ++-
proton-c/bindings/cpp/src/connection.cpp | 5 +
proton-c/bindings/cpp/src/handler.cpp | 2 +
proton-c/bindings/cpp/src/messaging_adapter.cpp | 7 ++
.../cpp/src/proactor_container_impl.cpp | 6 +-
8 files changed, 153 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/examples/cpp/mt_queue.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt_queue.hpp b/examples/cpp/mt_queue.hpp
new file mode 100644
index 0000000..f053ebe
--- /dev/null
+++ b/examples/cpp/mt_queue.hpp
@@ -0,0 +1,102 @@
+#ifndef MT_QUEUE_HPP
+#define MT_QUEUE_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <condition_variable>
+#include <stdexcept>
+#include <mutex>
+#include <queue>
+
+class closed_error : public std::runtime_error {
+ public:
+ closed_error() : std::runtime_error("closed") {}
+};
+
+// A bounded, thread-safe queue.
+// Objects are moved on and off the queue, not copied. Avoids overhead of copy operations.
+template <class T, size_t CAPACITY> class mt_queue {
+ std::queue<T> q_;
+ std::mutex lock_;
+ std::condition_variable push_;
+ std::condition_variable pop_;
+ bool closed_;
+
+ void do_push(T&& x) {
+ q_.push(std::move(x));
+ pop_.notify_one();
+ }
+
+ T do_pop() {
+ T x(std::move(q_.front()));
+ q_.pop();
+ push_.notify_one();
+ return x;
+ }
+
+ bool can_push() { return q_.size() < CAPACITY; }
+ bool can_pop() { return q_.size() > 0; }
+
+ public:
+
+ mt_queue() : closed_(false) {}
+
+ void push(T&& x) {
+ std::unique_lock<std::mutex> l(lock_);
+ while(!can_push())
+ push_.wait(l);
+ do_push(std::move(x));
+ }
+
+ T pop() {
+ std::unique_lock<std::mutex> l(lock_);
+ while(!can_pop())
+ pop_.wait(l);
+ return do_pop();
+ }
+
+ bool try_push(T&& x) noexcept {
+ std::lock_guard<std::mutex> l(lock_);
+ bool ok = can_push();
+ if (ok)
+ do_push(std::move(x));
+ return ok;
+ }
+
+ bool try_pop(T& x) noexcept {
+ std::lock_guard<std::mutex> l(lock_);
+ bool ok = can_pop();
+ if (ok)
+ x = std::move(do_pop());
+ return ok;
+ }
+
+ size_t capacity() noexcept {
+ return CAPACITY;
+ }
+
+ size_t size() noexcept {
+ std::lock_guard<std::mutex> l(lock_);
+ return q_.size();
+ }
+};
+
+
+#endif // MT_QUEUE_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index 10ea61b..58e2afc 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -139,6 +139,26 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
/// @see @ref connection_options::idle_timeout
PN_CPP_EXTERN uint32_t idle_timeout() const;
+ /// **Experimental** - trigger thread-safe call to messaging_handler::on_wake()
+ ///
+ /// *Thread safe*: this is the *only* @ref connection function that can be
+ /// called from outside the handler thread.
+ ///
+ /// messaging_handler::on_wake() will be called on the handler as soon as
+ /// possible after the call to wake(), possibly in a different thread.
+ ///
+ /// @note
+ /// * Multiple calls to wake() may be coalesced into a single call to on_wake()
+ /// that occurs after all of them.
+ /// * Spurious on_wake() calls can occur even if the application does not call
+ /// on_wake()
+ ///
+ /// wake() is the primitive building-block for thread-safe applications.
+ /// With C++11 or greater, @ref work_queue provides an easier way execute
+ /// code safely in the handler thread.
+ ///
+ PN_CPP_EXTERN void wake() const;
+
/// @cond INTERNAL
friend class internal::factory<connection>;
friend class container;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/include/proton/internal/object.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/object.hpp b/proton-c/bindings/cpp/include/proton/internal/object.hpp
index 442b09d..388c65c 100644
--- a/proton-c/bindings/cpp/include/proton/internal/object.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/object.hpp
@@ -85,9 +85,9 @@ template <class T> pn_ptr<T> take_ownership(T* p) { return pn_ptr<T>::take_owner
/// Base class for proton object types.
template <class T> class object : private comparable<object<T> > {
public:
- bool operator!() const { return !object_; }
+ bool operator!() const { return !object_.get(); }
#if PN_CPP_HAS_EXPLICIT_CONVERSIONS
- explicit operator bool() const { return object_; }
+ explicit operator bool() const { return object_.get(); }
#endif
protected:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
index a5e2bdd..0792dba 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
@@ -146,11 +146,24 @@ PN_CPP_CLASS_EXTERN messaging_handler {
/// **Experimental** - The receiving peer has requested a drain of
/// remaining credit.
PN_CPP_EXTERN virtual void on_sender_drain_start(sender &s);
-
+
/// **Experimental** - The credit outstanding at the time of the
/// call to receiver::drain has been consumed or returned.
PN_CPP_EXTERN virtual void on_receiver_drain_finish(receiver &r);
+ /// *Experimental** - a wakeup event that can be triggered from another thread.
+ ///
+ /// @see connection::wake()
+ ///
+ /// The on_wake() event carries no information about why it was called.
+ /// Recommended use is that the messaging_handler have shared, thread-safe
+ /// members that it examines to decide how/if to respond to the wake.
+ ///
+ /// @note on_wake() can be called internally by proton without any
+ /// application calls to connection::wake()
+ ///
+ PN_CPP_EXTERN virtual void on_wake(connection&);
+
/// Fallback error handling.
PN_CPP_EXTERN virtual void on_error(const error_condition &c);
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp
index a37d3b5..1d66c41 100644
--- a/proton-c/bindings/cpp/src/connection.cpp
+++ b/proton-c/bindings/cpp/src/connection.cpp
@@ -40,6 +40,7 @@
#include <proton/session.h>
#include <proton/transport.h>
#include <proton/object.h>
+#include <proton/proactor.h>
namespace proton {
@@ -174,4 +175,8 @@ uint32_t connection::idle_timeout() const {
return pn_transport_get_remote_idle_timeout(pn_connection_transport(pn_object()));
}
+void connection::wake() const {
+ pn_connection_wake(pn_object());
+}
+
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/handler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp
index 84d9e8a..8f7fbe9 100644
--- a/proton-c/bindings/cpp/src/handler.cpp
+++ b/proton-c/bindings/cpp/src/handler.cpp
@@ -85,7 +85,9 @@ void messaging_handler::on_tracker_settle(tracker &) {}
void messaging_handler::on_delivery_settle(delivery &) {}
void messaging_handler::on_sender_drain_start(sender &) {}
void messaging_handler::on_receiver_drain_finish(receiver &) {}
+void messaging_handler::on_wake(connection&) {}
void messaging_handler::on_error(const error_condition& c) { throw proton::error(c.what()); }
+
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index cb1b776..2357cfa 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -296,6 +296,11 @@ void on_transport_closed(messaging_handler& handler, pn_event_t* event) {
handler.on_transport_close(t);
}
+void on_wake(messaging_handler& handler, pn_event_t* event) {
+ connection c(make_wrapper(pn_event_connection(event)));
+ handler.on_wake(c);
+}
+
}
void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event)
@@ -321,6 +326,8 @@ void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event)
case PN_TRANSPORT_CLOSED: on_transport_closed(handler, event); break;
+ case PN_CONNECTION_WAKE: on_wake(handler, event); break;
+
// Ignore everything else
default: break;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 1389306..b262f8c 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -381,10 +381,6 @@ bool container::impl::handle(pn_event_t* event) {
}
return false;
}
- // If the event was just connection wake then there isn't anything more to do
- case PN_CONNECTION_WAKE:
- return false;
-
// Connection driver will bind a new transport to the connection at this point
case PN_CONNECTION_INIT:
return false;
@@ -443,7 +439,7 @@ void container::impl::thread() {
finished = handle(e);
if (finished) break;
}
- } catch (proton::error& e) {
+ } catch (std::exception& e) {
// If we caught an exception then shutdown the (other threads of the) container
disconnect_error_ = error_condition("exception", e.what());
if (!stopping_) stop(disconnect_error_);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org