You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2023/01/12 17:45:51 UTC
[qpid-proton] branch main updated: PROTON-2657: [cpp] Accessors for user data on endpoints and deliveries
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new 51583a516 PROTON-2657: [cpp] Accessors for user data on endpoints and deliveries
51583a516 is described below
commit 51583a51652074586d4672b116224ec1b48de387
Author: Rakhi Kumari <ra...@gmail.com>
AuthorDate: Mon Dec 5 15:20:13 2022 +0530
PROTON-2657: [cpp] Accessors for user data on endpoints and deliveries
---
cpp/examples/broker.cpp | 29 +++----
cpp/include/proton/connection.hpp | 6 ++
cpp/include/proton/link.hpp | 6 ++
cpp/include/proton/listener.hpp | 6 ++
cpp/include/proton/session.hpp | 6 ++
cpp/include/proton/transfer.hpp | 6 ++
cpp/src/connection.cpp | 10 +++
cpp/src/context_test.cpp | 169 ++++++++++++++++++++++++++++++++++++++
cpp/src/contexts.cpp | 8 +-
cpp/src/contexts.hpp | 13 +++
cpp/src/link.cpp | 13 +++
cpp/src/listener.cpp | 10 +++
cpp/src/session.cpp | 12 +++
cpp/src/transfer.cpp | 11 +++
cpp/tests.cmake | 1 +
15 files changed, 287 insertions(+), 19 deletions(-)
diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp
index c09e0b0e0..6c4e24785 100644
--- a/cpp/examples/broker.cpp
+++ b/cpp/examples/broker.cpp
@@ -80,13 +80,10 @@ bool verbose;
class Queue;
class Sender;
-typedef std::map<proton::sender, Sender*> senders;
-
class Sender : public proton::messaging_handler {
friend class connection_handler;
proton::sender sender_;
- senders& senders_;
proton::work_queue& work_queue_;
std::string queue_name_;
Queue* queue_;
@@ -97,15 +94,19 @@ class Sender : public proton::messaging_handler {
void on_sender_close(proton::sender &sender) override;
public:
- Sender(proton::sender s, senders& ss) :
- sender_(s), senders_(ss), work_queue_(s.work_queue()), queue_(0), pending_credit_(0)
- {}
+ Sender(proton::sender s) :
+ sender_(s), work_queue_(s.work_queue()), queue_(0), pending_credit_(0)
+ {
+ s.user_data(this);
+ }
bool add(proton::work f) {
return work_queue_.add(f);
}
-
+ static Sender* get(const proton::sender& s) {
+ return reinterpret_cast<Sender*>(s.user_data());
+ }
void boundQueue(Queue* q, std::string qn);
void sendMsg(proton::message m) {
DOUT(std::cerr << "Sender: " << this << " sending\n";);
@@ -204,7 +205,6 @@ void Sender::on_sender_close(proton::sender &sender) {
// If so, we should have a way to mark the sender deleted, so we can delete
// on queue binding
}
- senders_.erase(sender);
}
void Sender::boundQueue(Queue* q, std::string qn) {
@@ -346,7 +346,6 @@ void Receiver::queueMsgToNamedQueue(proton::message& m, std::string address) {
class connection_handler : public proton::messaging_handler {
QueueManager& queue_manager_;
- senders senders_;
public:
connection_handler(QueueManager& qm) :
@@ -363,8 +362,7 @@ public:
// A sender sends messages from a queue to a subscriber.
void on_sender_open(proton::sender &sender) override {
std::string qn = sender.source().dynamic() ? "" : sender.source().address();
- Sender* s = new Sender(sender, senders_);
- senders_[sender] = s;
+ Sender* s = new Sender(sender);
queue_manager_.add([=]{queue_manager_.findQueueSender(s, qn);});
}
@@ -384,14 +382,11 @@ public:
void on_session_close(proton::session &session) override {
// Unsubscribe all senders that belong to session.
for (proton::sender_iterator i = session.senders().begin(); i != session.senders().end(); ++i) {
- senders::iterator j = senders_.find(*i);
- if (j == senders_.end()) continue;
- Sender* s = j->second;
+ Sender* s = Sender::get(*i);
if (s->queue_) {
auto q = s->queue_;
s->queue_->add([=]{q->unsubscribe(s);});
}
- senders_.erase(j);
}
}
@@ -403,9 +398,7 @@ public:
void on_transport_close(proton::transport& t) override {
// Unsubscribe all senders.
for (proton::sender_iterator i = t.connection().senders().begin(); i != t.connection().senders().end(); ++i) {
- senders::iterator j = senders_.find(*i);
- if (j == senders_.end()) continue;
- Sender* s = j->second;
+ Sender* s = Sender::get(*i);
if (s->queue_) {
auto q = s->queue_;
s->queue_->add([=]{q->unsubscribe(s);});
diff --git a/cpp/include/proton/connection.hpp b/cpp/include/proton/connection.hpp
index bff8e21fb..ed590542d 100644
--- a/cpp/include/proton/connection.hpp
+++ b/cpp/include/proton/connection.hpp
@@ -202,6 +202,12 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
/// existing parameters as if `connection_options::update()` was used.
PN_CPP_EXTERN void update_options(const connection_options&);
+ /// Set user data on this connection.
+ PN_CPP_EXTERN void user_data(void* user_data) const;
+
+ /// Get user data from this connection.
+ PN_CPP_EXTERN void* user_data() const;
+
/// @cond INTERNAL
friend class internal::factory<connection>;
friend class container;
diff --git a/cpp/include/proton/link.hpp b/cpp/include/proton/link.hpp
index c7d55b2e9..ebd81c30b 100644
--- a/cpp/include/proton/link.hpp
+++ b/cpp/include/proton/link.hpp
@@ -94,6 +94,12 @@ PN_CPP_CLASS_EXTERN link : public internal::object<pn_link_t> , public endpoint
/// **Unsettled API** - Properties supplied by the remote link endpoint.
PN_CPP_EXTERN std::map<symbol, value> properties() const;
+ /// Set user data on this link.
+ PN_CPP_EXTERN void user_data(void* user_data) const;
+
+ /// Get user data from this link.
+ PN_CPP_EXTERN void* user_data() const;
+
protected:
/// @cond INTERNAL
diff --git a/cpp/include/proton/listener.hpp b/cpp/include/proton/listener.hpp
index d5d0aba0b..d7e04892d 100644
--- a/cpp/include/proton/listener.hpp
+++ b/cpp/include/proton/listener.hpp
@@ -64,6 +64,12 @@ class PN_CPP_CLASS_EXTERN listener {
/// @throw proton::error if this listener is not managed by a container.
PN_CPP_EXTERN class container& container() const;
+ /// Set user data on this listener.
+ PN_CPP_EXTERN void user_data(void* user_data) const;
+
+ /// Get user data from this listener.
+ PN_CPP_EXTERN void* user_data() const;
+
private:
pn_listener_t* listener_;
diff --git a/cpp/include/proton/session.hpp b/cpp/include/proton/session.hpp
index 568c6d087..60522c817 100644
--- a/cpp/include/proton/session.hpp
+++ b/cpp/include/proton/session.hpp
@@ -99,6 +99,12 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp
/// Return the receivers on this session.
PN_CPP_EXTERN receiver_range receivers() const;
+ /// Set user data on this session.
+ PN_CPP_EXTERN void user_data(void* user_data) const;
+
+ /// Get user data from this session.
+ PN_CPP_EXTERN void* user_data() const;
+
/// @cond INTERNAL
friend class internal::factory<session>;
friend class session_iterator;
diff --git a/cpp/include/proton/transfer.hpp b/cpp/include/proton/transfer.hpp
index 61657fe3b..cf0474a75 100644
--- a/cpp/include/proton/transfer.hpp
+++ b/cpp/include/proton/transfer.hpp
@@ -77,6 +77,12 @@ class transfer : public internal::object<pn_delivery_t> {
/// Return true if the transfer has been settled.
PN_CPP_EXTERN bool settled() const;
+ /// Set user data on this transfer.
+ PN_CPP_EXTERN void user_data(void* user_data) const;
+
+ /// Get user data from this transfer.
+ PN_CPP_EXTERN void* user_data() const;
+
/// @cond INTERNAL
friend class internal::factory<transfer>;
/// @endcond
diff --git a/cpp/src/connection.cpp b/cpp/src/connection.cpp
index 6293779c8..128e71b0d 100644
--- a/cpp/src/connection.cpp
+++ b/cpp/src/connection.cpp
@@ -223,4 +223,14 @@ void connection::update_options(const connection_options& options) {
cc.connection_options_->update(options);
}
+void connection::user_data(void* user_data) const {
+ connection_context& cc = connection_context::get(pn_object());
+ cc.user_data_ = user_data;
+}
+
+void* connection::user_data() const {
+ connection_context& cc = connection_context::get(pn_object());
+ return cc.user_data_;
+}
+
} // namespace proton
diff --git a/cpp/src/context_test.cpp b/cpp/src/context_test.cpp
new file mode 100644
index 000000000..f53a61cee
--- /dev/null
+++ b/cpp/src/context_test.cpp
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.h>
+#include <proton/delivery.hpp>
+#include <proton/link.hpp>
+#include <proton/listen_handler.hpp>
+#include <proton/listener.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/tracker.hpp>
+#include <proton/types.h>
+#include <proton/types.hpp>
+#include <proton/value.hpp>
+
+#include "proton/error_condition.hpp"
+#include "proton/receiver_options.hpp"
+#include "proton/transport.hpp"
+#include "proton/work_queue.hpp"
+#include "test_bits.hpp"
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+
+namespace {
+std::mutex m;
+std::condition_variable cv;
+bool listener_ready = false;
+int listener_port;
+} // namespace
+
+class test_server : public proton::messaging_handler {
+ private:
+ class listener_ready_handler : public proton::listen_handler {
+ void on_open(proton::listener &l) override {
+ {
+ std::lock_guard<std::mutex> lk(m);
+ listener_port = l.port();
+ listener_ready = true;
+ }
+ cv.notify_one();
+ }
+ };
+
+ std::string url;
+ proton::listener listener;
+ listener_ready_handler listen_handler;
+
+ public:
+ test_server (const std::string &s) : url(s) {}
+
+ void on_container_start(proton::container &c) override {
+ listener = c.listen(url, listen_handler);
+
+ std::string data = "listener-user-data";
+ // Set user context 'data' on listener.
+ listener.user_data(&data);
+ ASSERT_EQUAL(&data, listener.user_data());
+ }
+
+ void on_message(proton::delivery &d, proton::message &msg) override {
+ std::string data = "delivery-user-data";
+ // Set user context 'data' on delivery.
+ d.user_data(&data);
+ ASSERT_EQUAL(&data, d.user_data());
+
+ d.receiver().close();
+ d.connection().close();
+ listener.stop();
+ }
+};
+
+class test_client : public proton::messaging_handler {
+ private:
+ std::string url;
+ proton::sender sender;
+
+ public:
+ test_client (const std::string &s) : url(s) {}
+
+ void on_container_start(proton::container &c) override {
+ proton::connection_options co;
+ sender = c.open_sender(url, co);
+ }
+
+ void on_connection_open(proton::connection& c) override {
+ // Get default session
+ proton::session s = c.default_session();
+
+ std::string data_ssn = "session-user-data";
+ // Set user context 'data' on default session.
+ s.user_data(&data_ssn);
+ ASSERT_EQUAL(&data_ssn, s.user_data());
+
+ std::string data_con = "connection-user-data";
+ // Set user context 'data' on current connection.
+ c.user_data(&data_con);
+ ASSERT_EQUAL(&data_con, c.user_data());
+ }
+
+ void on_sendable(proton::sender &s) override {
+ proton::message msg;
+ msg.body("message");
+ proton::tracker t = s.send(msg);
+
+ std::string data = "sender-user-data";
+ // Set user context 'data' on sender.
+ s.user_data(&data);
+ ASSERT_EQUAL(&data, s.user_data());
+
+ s.connection().close();
+ }
+
+ void on_tracker_accept(proton::tracker &t) override {
+ std::string data = "tracker-user-data";
+ // Set user context 'data' on tracker.
+ t.user_data(&data);
+ ASSERT_EQUAL(&data, t.user_data());
+ }
+};
+
+int test_user_context() {
+
+ std::string recv_address("127.0.0.1:0/test");
+ test_server recv(recv_address);
+ proton::container c(recv);
+ std::thread thread_recv([&c]() -> void { c.run(); });
+
+ // wait until listener is ready
+ std::unique_lock<std::mutex> lk(m);
+ cv.wait(lk, [] { return listener_ready; });
+
+ std::string send_address =
+ "127.0.0.1:" + std::to_string(listener_port) + "/test";
+ test_client send(send_address);
+ proton::container(send).run();
+ thread_recv.join();
+
+ return 0;
+}
+
+int main(int argc, char **argv) {
+ int failed = 0;
+ RUN_ARGV_TEST(failed, test_user_context());
+ return failed;
+}
diff --git a/cpp/src/contexts.cpp b/cpp/src/contexts.cpp
index 9dc13dbf0..4f85ace93 100644
--- a/cpp/src/contexts.cpp
+++ b/cpp/src/contexts.cpp
@@ -30,10 +30,11 @@
#include "proton/reconnect_options.hpp"
#include <proton/connection.h>
-#include <proton/object.h>
+#include <proton/delivery.h>
#include <proton/link.h>
#include <proton/listener.h>
#include <proton/message.h>
+#include <proton/object.h>
#include <proton/session.h>
#include <typeinfo>
@@ -49,6 +50,7 @@ PN_HANDLE(CONNECTION_CONTEXT)
PN_HANDLE(LISTENER_CONTEXT)
PN_HANDLE(SESSION_CONTEXT)
PN_HANDLE(LINK_CONTEXT)
+PN_HANDLE(TRANSFER_CONTEXT)
template <class T>
T* get_context(pn_record_t* record, pn_handle_t handle) {
@@ -89,4 +91,8 @@ session_context& session_context::get(pn_session_t* s) {
return ref<session_context>(id(pn_session_attachments(s), SESSION_CONTEXT));
}
+transfer_context& transfer_context::get(pn_delivery_t* s) {
+ return ref<transfer_context>(id(pn_delivery_attachments(s), TRANSFER_CONTEXT));
+}
+
}
diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp
index 4694ca969..02ea76083 100644
--- a/cpp/src/contexts.hpp
+++ b/cpp/src/contexts.hpp
@@ -103,6 +103,7 @@ class connection_context : public context {
listener_context* listener_context_;
work_queue work_queue_;
std::string active_url_;
+ void* user_data_;
};
class reconnect_options_base;
@@ -127,6 +128,7 @@ class listener_context : public context {
listen_handler* listen_handler_;
std::unique_ptr<const connection_options> connection_options_;
+ void* user_data_;
};
class link_context : public context {
@@ -140,6 +142,7 @@ class link_context : public context {
bool auto_accept;
bool auto_settle;
bool draining;
+ void* user_data_;
};
class session_context : public context {
@@ -148,6 +151,16 @@ class session_context : public context {
static session_context& get(pn_session_t* s);
messaging_handler* handler;
+ void* user_data_;
+};
+
+class transfer_context : public context {
+ public:
+ transfer_context() : handler(0) {}
+ static transfer_context& get(pn_delivery_t* s);
+
+ messaging_handler* handler;
+ void* user_data_;
};
}
diff --git a/cpp/src/link.cpp b/cpp/src/link.cpp
index c94fb4468..c6f58f48d 100644
--- a/cpp/src/link.cpp
+++ b/cpp/src/link.cpp
@@ -95,4 +95,17 @@ std::map<symbol, value> link::properties() const {
error_condition link::error() const {
return make_wrapper(pn_link_remote_condition(pn_object()));
}
+
+void link::user_data(void* user_data) const {
+ pn_link_t* lnk = pn_object();
+ link_context& lctx = link_context::get(lnk);
+ lctx.user_data_ = user_data;
+}
+
+void* link::user_data() const {
+ pn_link_t* lnk = pn_object();
+ link_context& lctx = link_context::get(lnk);
+ return lctx.user_data_;
+}
+
}
diff --git a/cpp/src/listener.cpp b/cpp/src/listener.cpp
index 931600aec..91d0350d7 100644
--- a/cpp/src/listener.cpp
+++ b/cpp/src/listener.cpp
@@ -60,6 +60,16 @@ class container& listener::container() const {
return *reinterpret_cast<class container*>(c);
}
+void listener::user_data(void* user_data) const {
+ listener_context& lc = listener_context::get(listener_);
+ lc.user_data_ = user_data;
+}
+
+void* listener::user_data() const {
+ listener_context& lc = listener_context::get(listener_);
+ return lc.user_data_;
+}
+
// Listen handler
listen_handler::~listen_handler() = default;
void listen_handler::on_open(listener&) {}
diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp
index 7ba809b82..b8f777a00 100644
--- a/cpp/src/session.cpp
+++ b/cpp/src/session.cpp
@@ -136,4 +136,16 @@ session_iterator session_iterator::operator++() {
return *this;
}
+void session::user_data(void* user_data) const {
+ pn_session_t* ssn = pn_object();
+ session_context& sctx = session_context::get(ssn);
+ sctx.user_data_ = user_data;
+}
+
+void* session::user_data() const {
+ pn_session_t* ssn = pn_object();
+ session_context& sctx = session_context::get(ssn);
+ return sctx.user_data_;
+}
+
} // namespace proton
diff --git a/cpp/src/transfer.cpp b/cpp/src/transfer.cpp
index a024abf61..063254267 100644
--- a/cpp/src/transfer.cpp
+++ b/cpp/src/transfer.cpp
@@ -49,4 +49,15 @@ enum transfer::state transfer::state() const { return static_cast<enum state>(pn
std::string to_string(enum transfer::state s) { return pn_disposition_type_name(s); }
std::ostream& operator<<(std::ostream& o, const enum transfer::state s) { return o << to_string(s); }
+
+void transfer::user_data(void* user_data) const {
+ transfer_context& cc = transfer_context::get(pn_object());
+ cc.user_data_ = user_data;
+}
+
+void* transfer::user_data() const {
+ transfer_context& cc = transfer_context::get(pn_object());
+ return cc.user_data_;
+}
+
}
diff --git a/cpp/tests.cmake b/cpp/tests.cmake
index ee43b3c6e..fd515eabf 100644
--- a/cpp/tests.cmake
+++ b/cpp/tests.cmake
@@ -62,6 +62,7 @@ add_cpp_test(reconnect_test)
add_cpp_test(link_test)
add_cpp_test(credit_test)
add_cpp_test(delivery_test)
+add_cpp_test(context_test)
if (ENABLE_JSONCPP)
add_cpp_test(connect_config_test)
target_link_libraries(connect_config_test qpid-proton-core) # For pn_sasl_enabled
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org