You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/08/10 23:42:45 UTC
[1/2] qpid-proton git commit: PROTON-865: Give proton::message value
semantics.
Repository: qpid-proton
Updated Branches:
refs/heads/cjansen-cpp-client db11dee5a -> d5ec4da7d
PROTON-865: Give proton::message value semantics.
Can use copy (assign or ctor) or swap to move message data with no copy.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/422c1026
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/422c1026
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/422c1026
Branch: refs/heads/cjansen-cpp-client
Commit: 422c1026fa15f822bfeb727e5c5813c6c361e8b3
Parents: db11dee
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Aug 10 11:47:31 2015 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Aug 10 17:42:14 2015 -0400
----------------------------------------------------------------------
proton-c/bindings/cpp/README.md | 2 +
proton-c/bindings/cpp/include/proton/data.hpp | 2 +-
proton-c/bindings/cpp/include/proton/event.hpp | 9 +-
.../bindings/cpp/include/proton/message.hpp | 45 ++++++---
.../cpp/include/proton/messaging_event.hpp | 6 +-
.../include/proton/sync_request_response.hpp | 3 +-
proton-c/bindings/cpp/include/proton/values.hpp | 2 +-
proton-c/bindings/cpp/src/event.cpp | 8 +-
proton-c/bindings/cpp/src/message.cpp | 97 +++++++++-----------
proton-c/bindings/cpp/src/messaging_adapter.cpp | 18 +---
proton-c/bindings/cpp/src/messaging_event.cpp | 21 +----
.../bindings/cpp/src/sync_request_response.cpp | 20 ++--
12 files changed, 105 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/README.md b/proton-c/bindings/cpp/README.md
index 28eabbe..2762c8e 100644
--- a/proton-c/bindings/cpp/README.md
+++ b/proton-c/bindings/cpp/README.md
@@ -27,6 +27,8 @@ Tests
Features
- SASL/SSL support with interop tests.
- Reconnection
+- Browsing
+- Selectors
- Finish blocking API & examples.
- Described types, full support and tests.
- Durable subscriptions & demos (see python changes)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/include/proton/data.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/data.hpp b/proton-c/bindings/cpp/include/proton/data.hpp
index 638315a..83b3bff 100644
--- a/proton-c/bindings/cpp/include/proton/data.hpp
+++ b/proton-c/bindings/cpp/include/proton/data.hpp
@@ -26,7 +26,7 @@ struct pn_data_t;
namespace proton {
-/** Base for classes that hold AMQP data, not for direct use. @see value, encoder, decoder. */
+/** Base for classes that hold AMQP data, not for direct use. @see value, values, encoder, decoder. */
class data {
public:
PN_CPP_EXTERN explicit data();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/include/proton/event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/event.hpp b/proton-c/bindings/cpp/include/proton/event.hpp
index 22866e1..a9cbedd 100644
--- a/proton-c/bindings/cpp/include/proton/event.hpp
+++ b/proton-c/bindings/cpp/include/proton/event.hpp
@@ -27,7 +27,6 @@
#include "proton/message.hpp"
#include <vector>
-
namespace proton {
class handler;
@@ -54,10 +53,10 @@ class event {
virtual PN_CPP_EXTERN class link link();
/// Get delivey @throws error if no delivery.
virtual PN_CPP_EXTERN class delivery delivery();
- /// Get message @throws error if no message.
- virtual PN_CPP_EXTERN class message message();
- /// Get message @throws error if no message.
- virtual PN_CPP_EXTERN void message(class message&);
+ /** Get message @throws error if no message.
+ * Refernece is valid only till end of event dispatch. Copy or swap the message to keep it.
+ */
+ virtual PN_CPP_EXTERN class message &message();
protected:
PN_CPP_EXTERN event();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/include/proton/message.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/message.hpp b/proton-c/bindings/cpp/include/proton/message.hpp
index 727d8a5..ddbb306 100644
--- a/proton-c/bindings/cpp/include/proton/message.hpp
+++ b/proton-c/bindings/cpp/include/proton/message.hpp
@@ -22,29 +22,43 @@
*
*/
#include "proton/export.hpp"
-#include "proton/proton_handle.hpp"
#include "proton/value.hpp"
#include "proton/message.hpp"
#include <string>
struct pn_message_t;
-struct pn_data_t;
namespace proton {
-// TODO aconway 2015-08-07: make this a value-semantics class, hide pn_message_t.
+class link;
+class delivery;
-/// An AMQP message.
-class message : public proton_handle<pn_message_t>
+/** An AMQP message.
+ *
+ * This class has value semantics: the copy constructor and assignment make a
+ * copy of the underlying message data. If you want to transfer the message data
+ * without a copy use the swap member function or std::swap.
+ */
+
+class message
{
public:
PN_CPP_EXTERN message();
+ /** Takes ownership of the pn_message_t, calls pn_message_free on destruction. */
PN_CPP_EXTERN message(pn_message_t *);
+ /// Makes a copy of the other message.
PN_CPP_EXTERN message(const message&);
+ /// Makes a copy of the other message.
PN_CPP_EXTERN message& operator=(const message&);
+
PN_CPP_EXTERN ~message();
- PN_CPP_EXTERN pn_message_t *pn_message() const;
+ PN_CPP_EXTERN void swap(message&);
+
+ /// Access the underlying pn_message_t, note it will be freed by ~message.
+ PN_CPP_EXTERN pn_message_t *pn_message();
+ /// Forget the underlying pn_message_t, the message is cleared. Caller must call pn_message_free.
+ PN_CPP_EXTERN pn_message_t *pn_message_forget();
/** Clear the message content */
PN_CPP_EXTERN void clear();
@@ -93,7 +107,7 @@ class message : public proton_handle<pn_message_t>
PN_CPP_EXTERN std::string reply_to_group_id() const;
///@}
- /** Set the body to a proton::value. */
+ /** Set the body to a proton::value, copies the value. */
PN_CPP_EXTERN void body(const value&);
/** Set the body to any type T that can be converted to a proton::value */
@@ -102,10 +116,14 @@ class message : public proton_handle<pn_message_t>
/** Set the body to a sequence of values, each value is encoded as an AMQP section. */
PN_CPP_EXTERN void body(const values&);
- /** Get the body values, there may be more than one. */
+ /** Get the body values, there may be more than one.
+ * Note the reference will be invalidated by destroying the message or calling swap.
+ */
PN_CPP_EXTERN const values& body() const;
- /** Get a reference to the body, can be modified in-place. */
+ /** Get a reference to the body values, can be modified in-place.
+ * Note the reference will be invalidated by destroying the message or calling swap.
+ */
PN_CPP_EXTERN values& body();
// TODO aconway 2015-06-17: consistent and flexible treatment of buffers.
@@ -114,15 +132,18 @@ class message : public proton_handle<pn_message_t>
// buffers. Introduce a buffer type with begin/end pointers?
/** Encode the message into string data */
- PN_CPP_EXTERN void encode(std::string &data);
+ PN_CPP_EXTERN void encode(std::string &data) const;
/** Retrun encoded message as a string */
- PN_CPP_EXTERN std::string encode();
+ PN_CPP_EXTERN std::string encode() const;
/** Decode from string data into the message. */
PN_CPP_EXTERN void decode(const std::string &data);
+ /// Decode the message from link corresponding to delivery.
+ PN_CPP_EXTERN void decode(proton::link link, proton::delivery);
+
private:
+ pn_message_t *impl_;
mutable values body_;
- friend class proton_impl_ref<message>;
};
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/include/proton/messaging_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/messaging_event.hpp b/proton-c/bindings/cpp/include/proton/messaging_event.hpp
index e27003c..9eb4905 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_event.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_event.hpp
@@ -88,15 +88,15 @@ class messaging_event : public proton_event
virtual PN_CPP_EXTERN class sender sender();
virtual PN_CPP_EXTERN class receiver receiver();
virtual PN_CPP_EXTERN class link link();
- virtual PN_CPP_EXTERN class message message();
+ virtual PN_CPP_EXTERN class message& message();
virtual PN_CPP_EXTERN class delivery delivery();
- virtual PN_CPP_EXTERN void message(class message &);
PN_CPP_EXTERN event_type type() const;
private:
+ friend class messaging_adapter;
event_type type_;
proton_event *parent_event_;
- class message *message_;
+ class message message_;
messaging_event operator=(const messaging_event&);
messaging_event(const messaging_event&);
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sync_request_response.hpp b/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
index 604af4b..5557448 100644
--- a/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
+++ b/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
@@ -27,6 +27,7 @@
#include "proton/blocking_sender.hpp"
#include "proton/wait_condition.hpp"
#include <string>
+#include <memory>
struct pn_message_t;
struct pn_data_t;
@@ -49,7 +50,7 @@ class sync_request_response : public messaging_handler
std::string address_;
blocking_sender sender_;
blocking_receiver receiver_;
- message response_;
+ std::auto_ptr<message> response_;
amqp_ulong correlation_id_;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/include/proton/values.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/values.hpp b/proton-c/bindings/cpp/include/proton/values.hpp
index dc6fc57..1f7c650 100644
--- a/proton-c/bindings/cpp/include/proton/values.hpp
+++ b/proton-c/bindings/cpp/include/proton/values.hpp
@@ -34,7 +34,7 @@ class values : public encoder, public decoder {
PN_CPP_EXTERN values();
PN_CPP_EXTERN values(const values&);
- /** Does not take ownership, just a view on the data */
+ /** Does not take ownership, just operates on the pn_data_t object. */
PN_CPP_EXTERN values(pn_data_t*);
PN_CPP_EXTERN ~values();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/src/event.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/event.cpp b/proton-c/bindings/cpp/src/event.cpp
index 2affed1..b1eaa77 100644
--- a/proton-c/bindings/cpp/src/event.cpp
+++ b/proton-c/bindings/cpp/src/event.cpp
@@ -61,14 +61,8 @@ class delivery event::delivery() {
throw error(MSG("No link context for event"));
}
-class message event::message() {
+class message &event::message() {
throw error(MSG("No message associated with event"));
}
-void event::message(class message &) {
- throw error(MSG("Operation not supported for this type of event"));
-}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/src/message.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/message.cpp b/proton-c/bindings/cpp/src/message.cpp
index e1bd535..8aae044 100644
--- a/proton-c/bindings/cpp/src/message.cpp
+++ b/proton-c/bindings/cpp/src/message.cpp
@@ -21,47 +21,43 @@
#include "proton/message.hpp"
#include "proton/error.hpp"
+#include "proton/link.hpp"
+#include "proton/delivery.hpp"
#include "proton/message.h"
+#include "proton/link.h"
+#include "proton/delivery.h"
#include "msg.hpp"
#include "proton_bits.hpp"
#include "proton_impl_ref.hpp"
#include <cstring>
+#include <assert.h>
namespace proton {
-template class proton_handle<pn_message_t>;
+message::message() : impl_(::pn_message()), body_(0) { assert(impl_); }
-typedef proton_impl_ref<message> PI;
+message::message(pn_message_t *p) : impl_(p), body_(0) { assert(impl_); }
-message::message() : body_(0) {
- PI::ctor(*this, 0);
-}
-message::message(pn_message_t *p) : body_(0) {
- PI::ctor(*this, p);
-}
-message::message(const message& m) : proton_handle<pn_message_t>(), body_(0) {
- PI::copy(*this, m);
+message::message(const message& m) : impl_(::pn_message()), body_(0) { *this = m; }
+
+message& message::operator=(const message& m) {
+ // TODO aconway 2015-08-10: need more efficient pn_message_copy function
+ std::string data;
+ m.encode(data);
+ decode(data);
+ return *this;
}
-// TODO aconway 2015-06-17: message should be a value not a handle
-// Needs to own pn_message_t and do appropriate _copy and _free operations.
+message::~message() { if (impl_) pn_message_free(impl_); }
-message& message::operator=(const message& m) {
- return PI::assign(*this, m);
+void message::swap(message& m) {
+ if (this != &m) std::swap(impl_, m.impl_);
}
-message::~message() { PI::dtor(*this); }
void message::clear() { pn_message_clear(impl_); }
namespace {
-void confirm(pn_message_t * const& p) {
- if (p) return;
- const_cast<pn_message_t*&>(p) = pn_message(); // Correct refcount of 1
- if (!p)
- throw error(MSG("No memory"));
-}
-
void check(int err) {
if (err) throw error(error_str(err));
}
@@ -84,153 +80,124 @@ value get_value(pn_data_t* d) {
} // namespace
void message::id(const value& id) {
- confirm(impl_);
set_value(pn_message_id(impl_), id);
}
value message::id() const {
- confirm(impl_);
return get_value(pn_message_id(impl_));
}
void message::user(const std::string &id) {
- confirm(impl_);
check(pn_message_set_user_id(impl_, pn_bytes(id)));
}
std::string message::user() const {
- confirm(impl_);
return str(pn_message_get_user_id(impl_));
}
void message::address(const std::string &addr) {
- confirm(impl_);
check(pn_message_set_address(impl_, addr.c_str()));
}
std::string message::address() const {
- confirm(impl_);
const char* addr = pn_message_get_address(impl_);
return addr ? std::string(addr) : std::string();
}
void message::subject(const std::string &s) {
- confirm(impl_);
check(pn_message_set_subject(impl_, s.c_str()));
}
std::string message::subject() const {
- confirm(impl_);
const char* s = pn_message_get_subject(impl_);
return s ? std::string(s) : std::string();
}
void message::reply_to(const std::string &s) {
- confirm(impl_);
check(pn_message_set_reply_to(impl_, s.c_str()));
}
std::string message::reply_to() const {
- confirm(impl_);
const char* s = pn_message_get_reply_to(impl_);
return s ? std::string(s) : std::string();
}
void message::correlation_id(const value& id) {
- confirm(impl_);
set_value(pn_message_correlation_id(impl_), id);
}
value message::correlation_id() const {
- confirm(impl_);
return get_value(pn_message_correlation_id(impl_));
}
void message::content_type(const std::string &s) {
- confirm(impl_);
check(pn_message_set_content_type(impl_, s.c_str()));
}
std::string message::content_type() const {
- confirm(impl_);
const char* s = pn_message_get_content_type(impl_);
return s ? std::string(s) : std::string();
}
void message::content_encoding(const std::string &s) {
- confirm(impl_);
check(pn_message_set_content_encoding(impl_, s.c_str()));
}
std::string message::content_encoding() const {
- confirm(impl_);
const char* s = pn_message_get_content_encoding(impl_);
return s ? std::string(s) : std::string();
}
void message::expiry(amqp_timestamp t) {
- confirm(impl_);
pn_message_set_expiry_time(impl_, t.milliseconds);
}
amqp_timestamp message::expiry() const {
- confirm(impl_);
return amqp_timestamp(pn_message_get_expiry_time(impl_));
}
void message::creation_time(amqp_timestamp t) {
- confirm(impl_);
pn_message_set_creation_time(impl_, t);
}
amqp_timestamp message::creation_time() const {
- confirm(impl_);
return pn_message_get_creation_time(impl_);
}
void message::group_id(const std::string &s) {
- confirm(impl_);
check(pn_message_set_group_id(impl_, s.c_str()));
}
std::string message::group_id() const {
- confirm(impl_);
const char* s = pn_message_get_group_id(impl_);
return s ? std::string(s) : std::string();
}
void message::reply_to_group_id(const std::string &s) {
- confirm(impl_);
check(pn_message_set_reply_to_group_id(impl_, s.c_str()));
}
std::string message::reply_to_group_id() const {
- confirm(impl_);
const char* s = pn_message_get_reply_to_group_id(impl_);
return s ? std::string(s) : std::string();
}
void message::body(const value& v) {
- confirm(impl_);
set_value(pn_message_body(impl_), v);
}
void message::body(const values& v) {
- confirm(impl_);
pn_data_copy(pn_message_body(impl_), v.data_);
}
const values& message::body() const {
- confirm(impl_);
body_.view(pn_message_body(impl_));
return body_;
}
values& message::body() {
- confirm(impl_);
body_.view(pn_message_body(impl_));
return body_;
}
-void message::encode(std::string &s) {
- confirm(impl_);
+void message::encode(std::string &s) const {
size_t sz = s.capacity();
if (sz < 512) sz = 512;
while (true) {
@@ -247,13 +214,33 @@ void message::encode(std::string &s) {
}
}
+std::string message::encode() const {
+ std::string data;
+ encode(data);
+ return data;
+}
+
void message::decode(const std::string &s) {
- confirm(impl_);
check(pn_message_decode(impl_, s.data(), s.size()));
}
-pn_message_t *message::pn_message() const {
- return impl_;
+pn_message_t *message::pn_message() { return impl_; }
+
+pn_message_t *message::pn_message_forget() {
+ pn_message_t *result = impl_;
+ impl_ = 0;
+ return result;
+}
+
+void message::decode(proton::link link, proton::delivery delivery) {
+ std::string buf;
+ buf.resize(pn_delivery_pending(delivery.pn_delivery()));
+ ssize_t n = pn_link_recv(link.pn_link(), (char *) buf.data(), buf.size());
+ if (n != (ssize_t) buf.size()) throw error(MSG("link read failure"));
+ clear();
+ decode(buf);
+ pn_link_advance(link.pn_link());
}
}
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index 0bb574f..7a6ef77 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -61,21 +61,6 @@ void messaging_adapter::on_link_flow(event &e) {
}
}
-namespace {
-message receive_message(pn_link_t *lnk, pn_delivery_t *dlv) {
- std::string buf;
- size_t sz = pn_delivery_pending(dlv);
- buf.resize(sz);
- ssize_t n = pn_link_recv(lnk, (char *) buf.data(), sz);
- if (n != (ssize_t) sz)
- throw error(MSG("link read failure"));
- message m;
- m. decode(buf);
- pn_link_advance(lnk);
- return m;
-}
-} // namespace
-
void messaging_adapter::on_delivery(event &e) {
proton_event *pe = dynamic_cast<proton_event*>(&e);
if (pe) {
@@ -87,8 +72,7 @@ void messaging_adapter::on_delivery(event &e) {
if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) {
// generate on_message
messaging_event mevent(messaging_event::MESSAGE, *pe);
- message m(receive_message(lnk, dlv));
- mevent.message(m);
+ mevent.message_.decode(link(lnk), delivery(dlv));
if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
if (auto_accept_) {
pn_delivery_update(dlv, PN_RELEASED);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/src/messaging_event.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_event.cpp b/proton-c/bindings/cpp/src/messaging_event.cpp
index 5062d76..a629f47 100644
--- a/proton-c/bindings/cpp/src/messaging_event.cpp
+++ b/proton-c/bindings/cpp/src/messaging_event.cpp
@@ -34,19 +34,17 @@
namespace proton {
messaging_event::messaging_event(pn_event_t *ce, pn_event_type_t t, class container &c) :
- proton_event(ce, t, c), type_(messaging_event::PROTON), parent_event_(0), message_(0)
+ proton_event(ce, t, c), type_(messaging_event::PROTON), parent_event_(0)
{}
messaging_event::messaging_event(event_type t, proton_event &p) :
- proton_event(NULL, PN_EVENT_NONE, p.container()), type_(t), parent_event_(&p), message_(0)
+ proton_event(NULL, PN_EVENT_NONE, p.container()), type_(t), parent_event_(&p)
{
if (type_ == messaging_event::PROTON)
throw error(MSG("invalid messaging event type"));
}
-messaging_event::~messaging_event() {
- delete message_;
-}
+messaging_event::~messaging_event() {}
messaging_event::event_type messaging_event::type() const { return type_; }
@@ -90,19 +88,10 @@ delivery messaging_event::delivery() {
throw error(MSG("No delivery context for event"));
}
-message messaging_event::message() {
- if (parent_event_) {
- pn_message_t *m = event_context(parent_event_->pn_event());
- if (m)
- return proton::message(m);
- }
- throw error(MSG("No message context for event"));
-}
-
-void messaging_event::message(class message &m) {
+message &messaging_event::message() {
if (type_ != messaging_event::MESSAGE || !parent_event_)
throw error(MSG("event type does not provide message"));
- event_context(parent_event_->pn_event(), m.pn_message());
+ return message_;
}
void messaging_event::dispatch(handler &h) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/422c1026/proton-c/bindings/cpp/src/sync_request_response.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sync_request_response.cpp b/proton-c/bindings/cpp/src/sync_request_response.cpp
index eebd62d..3da4ceb 100644
--- a/proton-c/bindings/cpp/src/sync_request_response.cpp
+++ b/proton-c/bindings/cpp/src/sync_request_response.cpp
@@ -27,12 +27,12 @@ namespace proton {
namespace {
amqp_ulong global_correlation_id = 0;
-message null_message;
struct response_received : public wait_condition {
- response_received(message &m, amqp_ulong id) : message_(m), id_(id) {}
- bool achieved() { return message_ && message_.correlation_id() == id_; }
- message &message_;
+ response_received(std::auto_ptr<message>& m, amqp_ulong id) : message_(m), id_(id) {}
+ bool achieved() {
+ return message_.get() && message_->correlation_id() == id_; }
+ std::auto_ptr<message>& message_;
value id_;
};
@@ -41,14 +41,13 @@ struct response_received : public wait_condition {
sync_request_response::sync_request_response(blocking_connection &conn, const std::string addr):
connection_(conn), address_(addr),
sender_(connection_.create_sender(addr)),
- receiver_(connection_.create_receiver("", 1, true, this)), // credit=1, dynamic=true
- response_(null_message)
+ receiver_(connection_.create_receiver("", 1/*credit*/, true/*dynamic*/, this))
{
}
message sync_request_response::call(message &request) {
if (address_.empty() && request.address().empty())
- throw error(MSG("Request message has no address: " << request));
+ throw error(MSG("Request message has no address"));
// TODO: thread safe increment.
correlation_id_ = global_correlation_id++;
request.correlation_id(value(correlation_id_));
@@ -57,8 +56,8 @@ message sync_request_response::call(message &request) {
std::string txt("Waiting for response");
response_received cond(response_, correlation_id_);
connection_.wait(cond, txt);
- message resp = response_;
- response_ = null_message;
+ message resp = *response_;
+ response_.reset(0);
receiver_.flow(1);
return resp;
}
@@ -68,7 +67,8 @@ std::string sync_request_response::reply_to() {
}
void sync_request_response::on_message(event &e) {
- response_ = e.message();
+ response_.reset(new message);
+ response_->swap(e.message());
// Wake up enclosing blocking_connection.wait() to handle the message
e.container().yield();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: PROTON-865: blocking_connection::wait
template to allow arbitrary functor as test.
Posted by ac...@apache.org.
PROTON-865: blocking_connection::wait template to allow arbitrary functor as test.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d5ec4da7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d5ec4da7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d5ec4da7
Branch: refs/heads/cjansen-cpp-client
Commit: d5ec4da7d033e6096420109551175278490a976c
Parents: 422c102
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Aug 10 17:14:51 2015 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Aug 10 17:42:28 2015 -0400
----------------------------------------------------------------------
.../cpp/include/proton/blocking_connection.hpp | 33 +++++++++++---
.../include/proton/sync_request_response.hpp | 1 -
.../cpp/include/proton/wait_condition.hpp | 47 --------------------
.../bindings/cpp/src/blocking_connection.cpp | 6 +--
.../cpp/src/blocking_connection_impl.cpp | 28 ++++--------
.../cpp/src/blocking_connection_impl.hpp | 12 +++--
proton-c/bindings/cpp/src/blocking_link.cpp | 13 +++---
proton-c/bindings/cpp/src/blocking_receiver.cpp | 5 +--
proton-c/bindings/cpp/src/blocking_sender.cpp | 5 +--
.../bindings/cpp/src/sync_request_response.cpp | 5 +--
10 files changed, 58 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d5ec4da7/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
index a8eac72..e82a2ee 100644
--- a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
@@ -39,7 +39,6 @@ class blocking_connection_impl;
class ssl_domain;
class blocking_sender;
class blocking_receiver;
-class wait_condition;
// TODO documentation
class blocking_connection : public handle<blocking_connection_impl>
@@ -57,12 +56,36 @@ class blocking_connection : public handle<blocking_connection_impl>
PN_CPP_EXTERN blocking_sender create_sender(const std::string &address, handler *h=0);
PN_CPP_EXTERN blocking_receiver create_receiver(const std::string &address, int credit = 0,
bool dynamic = false, handler *h=0, std::string name = std::string());
- PN_CPP_EXTERN void wait(wait_condition &condition);
- PN_CPP_EXTERN void wait(wait_condition &condition, const std::string &msg);
- PN_CPP_EXTERN void wait(wait_condition &condition, const std::string &msg, duration timeout);
+
+ /// Abstract condition class for wait.
+ struct condition {
+ virtual ~condition() {}
+ virtual bool operator()() = 0;
+ };
+
+ /** Wait till cond returns true.
+ * C must be copyable and callable with no arguments and bool return value.
+ * Wait up to timeout if specified or blocking_connection::timeout() if not.
+ * @throws timeout_error with message msg if timeout is exceeded.
+ */
+ template <class C> void wait(C cond, const std::string &msg="", duration timeout=duration(-1)) {
+ condition_impl<C> c(cond);
+ wait(dynamic_cast<condition&>(c), msg, timeout);
+ }
+
PN_CPP_EXTERN duration timeout();
private:
- friend class private_impl_ref<blocking_connection>;
+
+ PN_CPP_EXTERN void wait(condition &, const std::string &msg="", duration timeout=duration(-1));
+
+
+ template <class C> struct condition_impl : public condition {
+ C cond_;
+ condition_impl(C c) : cond_(c) {}
+ bool operator()() { return cond_(); }
+ };
+
+ friend class private_impl_ref<blocking_connection>;
};
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d5ec4da7/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sync_request_response.hpp b/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
index 5557448..397e181 100644
--- a/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
+++ b/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
@@ -25,7 +25,6 @@
#include "proton/messaging_handler.hpp"
#include "proton/blocking_receiver.hpp"
#include "proton/blocking_sender.hpp"
-#include "proton/wait_condition.hpp"
#include <string>
#include <memory>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d5ec4da7/proton-c/bindings/cpp/include/proton/wait_condition.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/wait_condition.hpp b/proton-c/bindings/cpp/include/proton/wait_condition.hpp
deleted file mode 100644
index 760928e..0000000
--- a/proton-c/bindings/cpp/include/proton/wait_condition.hpp
+++ /dev/null
@@ -1,47 +0,0 @@
-#ifndef PROTON_CPP_WAITCONDITION_H
-#define PROTON_CPP_WAITCONDITION_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "proton/export.hpp"
-
-namespace proton {
-
-// TODO aconway 2015-07-15: c++11 should use std::function
-// c++03 could use a function template.
-
-// Interface class to indicates that an expected contion has been
-// achieved, i.e. for blocking_connection.wait()
-
-class wait_condition
-{
- public:
- PN_CPP_EXTERN virtual ~wait_condition();
-
- // Overide this member function to indicate whether an expected
- // condition is achieved and requires no further waiting.
- virtual bool achieved() = 0;
-};
-
-
-}
-
-#endif /*!PROTON_CPP_WAITCONDITION_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d5ec4da7/proton-c/bindings/cpp/src/blocking_connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection.cpp b/proton-c/bindings/cpp/src/blocking_connection.cpp
index 61475bc..c54005e 100644
--- a/proton-c/bindings/cpp/src/blocking_connection.cpp
+++ b/proton-c/bindings/cpp/src/blocking_connection.cpp
@@ -49,11 +49,7 @@ blocking_connection::blocking_connection(const proton::url &url, duration d, ssl
void blocking_connection::close() { impl_->close(); }
-void blocking_connection::wait(wait_condition &cond) { return impl_->wait(cond); }
-void blocking_connection::wait(wait_condition &cond, const std::string &msg) {
- return impl_->wait(cond, msg);
-}
-void blocking_connection::wait(wait_condition &cond, const std::string &msg, duration timeout) {
+void blocking_connection::wait(condition &cond, const std::string &msg, duration timeout) {
return impl_->wait(cond, msg, timeout);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d5ec4da7/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
index 10b40ee..e552830 100644
--- a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
+++ b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
@@ -22,7 +22,6 @@
#include "proton/messaging_handler.hpp"
#include "proton/duration.hpp"
#include "proton/error.hpp"
-#include "proton/wait_condition.hpp"
#include "blocking_connection_impl.hpp"
#include "msg.hpp"
#include "contexts.hpp"
@@ -31,9 +30,6 @@
namespace proton {
-wait_condition::~wait_condition() {}
-
-
void blocking_connection_impl::incref(blocking_connection_impl *impl_) {
impl_->refcount_++;
}
@@ -45,15 +41,15 @@ void blocking_connection_impl::decref(blocking_connection_impl *impl_) {
}
namespace {
-struct connection_opening : public wait_condition {
+struct connection_opening {
connection_opening(pn_connection_t *c) : pn_connection(c) {}
- bool achieved() { return (pn_connection_state(pn_connection) & PN_REMOTE_UNINIT); }
+ bool operator()() { return (pn_connection_state(pn_connection) & PN_REMOTE_UNINIT); }
pn_connection_t *pn_connection;
};
-struct connection_closed : public wait_condition {
+struct connection_closed {
connection_closed(pn_connection_t *c) : pn_connection(c) {}
- bool achieved() { return !(pn_connection_state(pn_connection) & PN_REMOTE_ACTIVE); }
+ bool operator()() { return !(pn_connection_state(pn_connection) & PN_REMOTE_ACTIVE); }
pn_connection_t *pn_connection;
};
@@ -83,18 +79,10 @@ void blocking_connection_impl::close() {
wait(cond);
}
-void blocking_connection_impl::wait(wait_condition &condition) {
- std::string empty;
- wait(condition, empty, timeout_);
-}
-
-void blocking_connection_impl::wait(wait_condition &condition, const std::string &msg) {
- wait(condition, msg, timeout_);
-}
-
-void blocking_connection_impl::wait(wait_condition &condition, const std::string &msg, duration wait_timeout) {
+void blocking_connection_impl::wait(blocking_connection::condition &condition, const std::string &msg, duration wait_timeout) {
+ if (wait_timeout == duration(-1)) wait_timeout = timeout_;
if (wait_timeout == duration::FOREVER) {
- while (!condition.achieved()) {
+ while (!condition()) {
container_.process();
}
}
@@ -105,7 +93,7 @@ void blocking_connection_impl::wait(wait_condition &condition, const std::string
try {
pn_timestamp_t now = pn_reactor_mark(reactor);
pn_timestamp_t deadline = now + wait_timeout.milliseconds;
- while (!condition.achieved()) {
+ while (!condition()) {
container_.process();
if (deadline < pn_reactor_mark(reactor)) {
std::string txt = "connection timed out";
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d5ec4da7/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
index b70193d..17f8f57 100644
--- a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
+++ b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
@@ -24,6 +24,7 @@
#include "proton/export.hpp"
#include "proton/endpoint.hpp"
#include "proton/container.hpp"
+#include "proton/blocking_connection.hpp"
#include "proton/types.h"
#include <string>
@@ -41,15 +42,20 @@ class ssl_domain;
PN_CPP_EXTERN blocking_connection_impl(const url &url, duration d, ssl_domain *ssld, container *c);
PN_CPP_EXTERN ~blocking_connection_impl();
PN_CPP_EXTERN void close();
- PN_CPP_EXTERN void wait(wait_condition &condition);
- PN_CPP_EXTERN void wait(wait_condition &condition, const std::string &msg);
- PN_CPP_EXTERN void wait(wait_condition &condition, const std::string &msg, duration timeout);
+ template <class C> void wait(C c, const std::string &msg="", duration timeout=duration(-1)) {
+ blocking_connection::condition_impl<C> cond(c);
+ wait(dynamic_cast<blocking_connection::condition&>(cond), msg, timeout);
+ }
+
PN_CPP_EXTERN pn_connection_t *pn_blocking_connection();
duration timeout() { return timeout_; }
static void incref(blocking_connection_impl *);
static void decref(blocking_connection_impl *);
private:
friend class blocking_connection;
+ PN_CPP_EXTERN void wait(blocking_connection::condition &, const std::string & ="",
+ duration=duration(-1));
+
container container_;
connection connection_;
url url_;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d5ec4da7/proton-c/bindings/cpp/src/blocking_link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_link.cpp b/proton-c/bindings/cpp/src/blocking_link.cpp
index c7f3551..43d1b83 100644
--- a/proton-c/bindings/cpp/src/blocking_link.cpp
+++ b/proton-c/bindings/cpp/src/blocking_link.cpp
@@ -21,7 +21,6 @@
#include "proton/blocking_link.hpp"
#include "proton/blocking_connection.hpp"
#include "proton/messaging_handler.hpp"
-#include "proton/wait_condition.hpp"
#include "proton/error.hpp"
#include "msg.hpp"
@@ -29,21 +28,21 @@
namespace proton {
namespace {
-struct link_opened : public wait_condition {
+struct link_opened {
link_opened(pn_link_t *l) : pn_link(l) {}
- bool achieved() { return !(pn_link_state(pn_link) & PN_REMOTE_UNINIT); }
+ bool operator()() { return !(pn_link_state(pn_link) & PN_REMOTE_UNINIT); }
pn_link_t *pn_link;
};
-struct link_closed : public wait_condition {
+struct link_closed {
link_closed(pn_link_t *l) : pn_link(l) {}
- bool achieved() { return (pn_link_state(pn_link) & PN_REMOTE_CLOSED); }
+ bool operator()() { return (pn_link_state(pn_link) & PN_REMOTE_CLOSED); }
pn_link_t *pn_link;
};
-struct link_not_open : public wait_condition {
+struct link_not_open {
link_not_open(pn_link_t *l) : pn_link(l) {}
- bool achieved() { return !(pn_link_state(pn_link) & PN_REMOTE_ACTIVE); }
+ bool operator()() { return !(pn_link_state(pn_link) & PN_REMOTE_ACTIVE); }
pn_link_t *pn_link;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d5ec4da7/proton-c/bindings/cpp/src/blocking_receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_receiver.cpp b/proton-c/bindings/cpp/src/blocking_receiver.cpp
index 3ca53c0..f4736e5 100644
--- a/proton-c/bindings/cpp/src/blocking_receiver.cpp
+++ b/proton-c/bindings/cpp/src/blocking_receiver.cpp
@@ -20,7 +20,6 @@
*/
#include "proton/blocking_receiver.hpp"
#include "proton/blocking_connection.hpp"
-#include "proton/wait_condition.hpp"
#include "proton/error.hpp"
#include "fetcher.hpp"
#include "msg.hpp"
@@ -30,9 +29,9 @@ namespace proton {
namespace {
-struct fetcher_has_message : public wait_condition {
+struct fetcher_has_message {
fetcher_has_message(fetcher &f) : fetcher_(f) {}
- bool achieved() { return fetcher_.has_message(); }
+ bool operator()() { return fetcher_.has_message(); }
fetcher &fetcher_;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d5ec4da7/proton-c/bindings/cpp/src/blocking_sender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_sender.cpp b/proton-c/bindings/cpp/src/blocking_sender.cpp
index 8fea98a..a4757eb 100644
--- a/proton-c/bindings/cpp/src/blocking_sender.cpp
+++ b/proton-c/bindings/cpp/src/blocking_sender.cpp
@@ -20,7 +20,6 @@
*/
#include "proton/blocking_sender.hpp"
#include "proton/blocking_connection.hpp"
-#include "proton/wait_condition.hpp"
#include "proton/error.hpp"
#include "msg.hpp"
@@ -28,9 +27,9 @@
namespace proton {
namespace {
-struct delivery_settled : public wait_condition {
+struct delivery_settled {
delivery_settled(pn_delivery_t *d) : pn_delivery(d) {}
- bool achieved() { return pn_delivery_settled(pn_delivery); }
+ bool operator()() { return pn_delivery_settled(pn_delivery); }
pn_delivery_t *pn_delivery;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d5ec4da7/proton-c/bindings/cpp/src/sync_request_response.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sync_request_response.cpp b/proton-c/bindings/cpp/src/sync_request_response.cpp
index 3da4ceb..f3c76a2 100644
--- a/proton-c/bindings/cpp/src/sync_request_response.cpp
+++ b/proton-c/bindings/cpp/src/sync_request_response.cpp
@@ -28,10 +28,9 @@ namespace proton {
namespace {
amqp_ulong global_correlation_id = 0;
-struct response_received : public wait_condition {
+struct response_received {
response_received(std::auto_ptr<message>& m, amqp_ulong id) : message_(m), id_(id) {}
- bool achieved() {
- return message_.get() && message_->correlation_id() == id_; }
+ bool operator()() { return message_.get() && message_->correlation_id() == id_; }
std::auto_ptr<message>& message_;
value id_;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org