You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2023/01/12 17:45:51 UTC

[qpid-proton] branch main updated: PROTON-2657: [cpp] Accessors for user data on endpoints and deliveries

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new 51583a516 PROTON-2657: [cpp] Accessors for user data on endpoints and deliveries
51583a516 is described below

commit 51583a51652074586d4672b116224ec1b48de387
Author: Rakhi Kumari <ra...@gmail.com>
AuthorDate: Mon Dec 5 15:20:13 2022 +0530

    PROTON-2657: [cpp] Accessors for user data on endpoints and deliveries
---
 cpp/examples/broker.cpp           |  29 +++----
 cpp/include/proton/connection.hpp |   6 ++
 cpp/include/proton/link.hpp       |   6 ++
 cpp/include/proton/listener.hpp   |   6 ++
 cpp/include/proton/session.hpp    |   6 ++
 cpp/include/proton/transfer.hpp   |   6 ++
 cpp/src/connection.cpp            |  10 +++
 cpp/src/context_test.cpp          | 169 ++++++++++++++++++++++++++++++++++++++
 cpp/src/contexts.cpp              |   8 +-
 cpp/src/contexts.hpp              |  13 +++
 cpp/src/link.cpp                  |  13 +++
 cpp/src/listener.cpp              |  10 +++
 cpp/src/session.cpp               |  12 +++
 cpp/src/transfer.cpp              |  11 +++
 cpp/tests.cmake                   |   1 +
 15 files changed, 287 insertions(+), 19 deletions(-)

diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp
index c09e0b0e0..6c4e24785 100644
--- a/cpp/examples/broker.cpp
+++ b/cpp/examples/broker.cpp
@@ -80,13 +80,10 @@ bool verbose;
 class Queue;
 class Sender;
 
-typedef std::map<proton::sender, Sender*> senders;
-
 class Sender : public proton::messaging_handler {
     friend class connection_handler;
 
     proton::sender sender_;
-    senders& senders_;
     proton::work_queue& work_queue_;
     std::string queue_name_;
     Queue* queue_;
@@ -97,15 +94,19 @@ class Sender : public proton::messaging_handler {
     void on_sender_close(proton::sender &sender) override;
 
 public:
-    Sender(proton::sender s, senders& ss) :
-        sender_(s), senders_(ss), work_queue_(s.work_queue()), queue_(0), pending_credit_(0)
-    {}
+    Sender(proton::sender s) :
+        sender_(s), work_queue_(s.work_queue()), queue_(0), pending_credit_(0)
+    {
+        s.user_data(this);
+    }
 
     bool add(proton::work f) {
         return work_queue_.add(f);
     }
 
-
+    static Sender* get(const proton::sender& s) {
+        return reinterpret_cast<Sender*>(s.user_data());
+    }
     void boundQueue(Queue* q, std::string qn);
     void sendMsg(proton::message m) {
         DOUT(std::cerr << "Sender:   " << this << " sending\n";);
@@ -204,7 +205,6 @@ void Sender::on_sender_close(proton::sender &sender) {
         // If so, we should have a way to mark the sender deleted, so we can delete
         // on queue binding
     }
-    senders_.erase(sender);
 }
 
 void Sender::boundQueue(Queue* q, std::string qn) {
@@ -346,7 +346,6 @@ void Receiver::queueMsgToNamedQueue(proton::message& m, std::string address) {
 
 class connection_handler : public proton::messaging_handler {
     QueueManager& queue_manager_;
-    senders senders_;
 
 public:
     connection_handler(QueueManager& qm) :
@@ -363,8 +362,7 @@ public:
     // A sender sends messages from a queue to a subscriber.
     void on_sender_open(proton::sender &sender) override {
         std::string qn = sender.source().dynamic() ? "" : sender.source().address();
-        Sender* s = new Sender(sender, senders_);
-        senders_[sender] = s;
+        Sender* s = new Sender(sender);
         queue_manager_.add([=]{queue_manager_.findQueueSender(s, qn);});
     }
 
@@ -384,14 +382,11 @@ public:
     void on_session_close(proton::session &session) override {
         // Unsubscribe all senders that belong to session.
         for (proton::sender_iterator i = session.senders().begin(); i != session.senders().end(); ++i) {
-            senders::iterator j = senders_.find(*i);
-            if (j == senders_.end()) continue;
-            Sender* s = j->second;
+            Sender* s = Sender::get(*i);
             if (s->queue_) {
                 auto q = s->queue_;
                 s->queue_->add([=]{q->unsubscribe(s);});
             }
-            senders_.erase(j);
         }
     }
 
@@ -403,9 +398,7 @@ public:
     void on_transport_close(proton::transport& t) override {
         // Unsubscribe all senders.
         for (proton::sender_iterator i = t.connection().senders().begin(); i != t.connection().senders().end(); ++i) {
-            senders::iterator j = senders_.find(*i);
-            if (j == senders_.end()) continue;
-            Sender* s = j->second;
+            Sender* s = Sender::get(*i);
             if (s->queue_) {
                 auto q = s->queue_;
                 s->queue_->add([=]{q->unsubscribe(s);});
diff --git a/cpp/include/proton/connection.hpp b/cpp/include/proton/connection.hpp
index bff8e21fb..ed590542d 100644
--- a/cpp/include/proton/connection.hpp
+++ b/cpp/include/proton/connection.hpp
@@ -202,6 +202,12 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
     /// existing parameters as if `connection_options::update()` was used.
     PN_CPP_EXTERN void update_options(const connection_options&);
 
+    /// Set user data on this connection.
+    PN_CPP_EXTERN void user_data(void* user_data) const;
+
+    /// Get user data from this connection.
+    PN_CPP_EXTERN void* user_data() const;
+
     /// @cond INTERNAL
   friend class internal::factory<connection>;
   friend class container;
diff --git a/cpp/include/proton/link.hpp b/cpp/include/proton/link.hpp
index c7d55b2e9..ebd81c30b 100644
--- a/cpp/include/proton/link.hpp
+++ b/cpp/include/proton/link.hpp
@@ -94,6 +94,12 @@ PN_CPP_CLASS_EXTERN link : public internal::object<pn_link_t> , public endpoint
     /// **Unsettled API** - Properties supplied by the remote link endpoint.
     PN_CPP_EXTERN std::map<symbol, value> properties() const;
 
+    /// Set user data on this link.
+    PN_CPP_EXTERN void user_data(void* user_data) const;
+
+    /// Get user data from this link.
+    PN_CPP_EXTERN void* user_data() const;
+
   protected:
     /// @cond INTERNAL
     
diff --git a/cpp/include/proton/listener.hpp b/cpp/include/proton/listener.hpp
index d5d0aba0b..d7e04892d 100644
--- a/cpp/include/proton/listener.hpp
+++ b/cpp/include/proton/listener.hpp
@@ -64,6 +64,12 @@ class PN_CPP_CLASS_EXTERN listener {
     /// @throw proton::error if this listener is not managed by a container.
     PN_CPP_EXTERN class container& container() const;
 
+    /// Set user data on this listener.
+    PN_CPP_EXTERN void user_data(void* user_data) const;
+
+    /// Get user data from this listener.
+    PN_CPP_EXTERN void* user_data() const;
+
   private:
     pn_listener_t* listener_;
 
diff --git a/cpp/include/proton/session.hpp b/cpp/include/proton/session.hpp
index 568c6d087..60522c817 100644
--- a/cpp/include/proton/session.hpp
+++ b/cpp/include/proton/session.hpp
@@ -99,6 +99,12 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp
     /// Return the receivers on this session.
     PN_CPP_EXTERN receiver_range receivers() const;
 
+    /// Set user data on this session.
+    PN_CPP_EXTERN void user_data(void* user_data) const;
+
+    /// Get user data from this session.
+    PN_CPP_EXTERN void* user_data() const;
+
     /// @cond INTERNAL
   friend class internal::factory<session>;
   friend class session_iterator;
diff --git a/cpp/include/proton/transfer.hpp b/cpp/include/proton/transfer.hpp
index 61657fe3b..cf0474a75 100644
--- a/cpp/include/proton/transfer.hpp
+++ b/cpp/include/proton/transfer.hpp
@@ -77,6 +77,12 @@ class transfer : public internal::object<pn_delivery_t> {
     /// Return true if the transfer has been settled.
     PN_CPP_EXTERN bool settled() const;
 
+    /// Set user data on this transfer.
+    PN_CPP_EXTERN void user_data(void* user_data) const;
+
+    /// Get user data from this transfer.
+    PN_CPP_EXTERN void* user_data() const;
+
     /// @cond INTERNAL
   friend class internal::factory<transfer>;
     /// @endcond
diff --git a/cpp/src/connection.cpp b/cpp/src/connection.cpp
index 6293779c8..128e71b0d 100644
--- a/cpp/src/connection.cpp
+++ b/cpp/src/connection.cpp
@@ -223,4 +223,14 @@ void connection::update_options(const connection_options& options) {
     cc.connection_options_->update(options);
 }
 
+void connection::user_data(void* user_data) const {
+    connection_context& cc = connection_context::get(pn_object());
+    cc.user_data_ = user_data;
+}
+
+void* connection::user_data() const {
+    connection_context& cc = connection_context::get(pn_object());
+    return cc.user_data_;
+}
+
 } // namespace proton
diff --git a/cpp/src/context_test.cpp b/cpp/src/context_test.cpp
new file mode 100644
index 000000000..f53a61cee
--- /dev/null
+++ b/cpp/src/context_test.cpp
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.h>
+#include <proton/delivery.hpp>
+#include <proton/link.hpp>
+#include <proton/listen_handler.hpp>
+#include <proton/listener.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/tracker.hpp>
+#include <proton/types.h>
+#include <proton/types.hpp>
+#include <proton/value.hpp>
+
+#include "proton/error_condition.hpp"
+#include "proton/receiver_options.hpp"
+#include "proton/transport.hpp"
+#include "proton/work_queue.hpp"
+#include "test_bits.hpp"
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+
+namespace {
+std::mutex m;
+std::condition_variable cv;
+bool listener_ready = false;
+int listener_port;
+} // namespace
+
+class test_server : public proton::messaging_handler {
+  private:
+    class listener_ready_handler : public proton::listen_handler {
+        void on_open(proton::listener &l) override {
+            {
+                std::lock_guard<std::mutex> lk(m);
+                listener_port = l.port();
+                listener_ready = true;
+            }
+            cv.notify_one();
+        }
+    };
+
+    std::string url;
+    proton::listener listener;
+    listener_ready_handler listen_handler;
+
+  public:
+    test_server (const std::string &s) : url(s) {}
+
+    void on_container_start(proton::container &c) override {
+        listener = c.listen(url, listen_handler);
+
+        std::string data = "listener-user-data";
+        // Set user context 'data' on listener.
+        listener.user_data(&data);
+        ASSERT_EQUAL(&data, listener.user_data());
+    }
+
+    void on_message(proton::delivery &d, proton::message &msg) override {
+        std::string data = "delivery-user-data";
+        // Set user context 'data' on delivery.
+        d.user_data(&data);
+        ASSERT_EQUAL(&data, d.user_data());
+
+        d.receiver().close();
+        d.connection().close();
+        listener.stop();
+    }
+};
+
+class test_client : public proton::messaging_handler {
+  private:
+    std::string url;
+    proton::sender sender;
+
+  public:
+    test_client (const std::string &s) : url(s) {}
+
+    void on_container_start(proton::container &c) override {
+        proton::connection_options co;
+        sender = c.open_sender(url, co);
+    }
+
+    void on_connection_open(proton::connection& c) override {
+        // Get default session
+        proton::session s = c.default_session();
+
+        std::string data_ssn = "session-user-data";
+        // Set user context 'data' on default session.
+        s.user_data(&data_ssn);
+        ASSERT_EQUAL(&data_ssn, s.user_data());
+
+        std::string data_con = "connection-user-data";
+        // Set user context 'data' on current connection.
+        c.user_data(&data_con);
+        ASSERT_EQUAL(&data_con, c.user_data());
+    }
+
+    void on_sendable(proton::sender &s) override {
+        proton::message msg;
+        msg.body("message");
+        proton::tracker t = s.send(msg);
+
+        std::string data = "sender-user-data";
+        // Set user context 'data' on sender.
+        s.user_data(&data);
+        ASSERT_EQUAL(&data, s.user_data());
+
+        s.connection().close();
+    }
+
+    void on_tracker_accept(proton::tracker &t) override {
+        std::string data = "tracker-user-data";
+        // Set user context 'data' on tracker.
+        t.user_data(&data);
+        ASSERT_EQUAL(&data, t.user_data());
+    }
+};
+
+int test_user_context() {
+
+    std::string recv_address("127.0.0.1:0/test");
+    test_server recv(recv_address);
+    proton::container c(recv);
+    std::thread thread_recv([&c]() -> void { c.run(); });
+
+    // wait until listener is ready
+    std::unique_lock<std::mutex> lk(m);
+    cv.wait(lk, [] { return listener_ready; });
+
+    std::string send_address =
+        "127.0.0.1:" + std::to_string(listener_port) + "/test";
+    test_client send(send_address);
+    proton::container(send).run();
+    thread_recv.join();
+
+    return 0;
+}
+
+int main(int argc, char **argv) {
+    int failed = 0;
+    RUN_ARGV_TEST(failed, test_user_context());
+    return failed;
+}
diff --git a/cpp/src/contexts.cpp b/cpp/src/contexts.cpp
index 9dc13dbf0..4f85ace93 100644
--- a/cpp/src/contexts.cpp
+++ b/cpp/src/contexts.cpp
@@ -30,10 +30,11 @@
 #include "proton/reconnect_options.hpp"
 
 #include <proton/connection.h>
-#include <proton/object.h>
+#include <proton/delivery.h>
 #include <proton/link.h>
 #include <proton/listener.h>
 #include <proton/message.h>
+#include <proton/object.h>
 #include <proton/session.h>
 
 #include <typeinfo>
@@ -49,6 +50,7 @@ PN_HANDLE(CONNECTION_CONTEXT)
 PN_HANDLE(LISTENER_CONTEXT)
 PN_HANDLE(SESSION_CONTEXT)
 PN_HANDLE(LINK_CONTEXT)
+PN_HANDLE(TRANSFER_CONTEXT)
 
 template <class T>
 T* get_context(pn_record_t* record, pn_handle_t handle) {
@@ -89,4 +91,8 @@ session_context& session_context::get(pn_session_t* s) {
     return ref<session_context>(id(pn_session_attachments(s), SESSION_CONTEXT));
 }
 
+transfer_context& transfer_context::get(pn_delivery_t* s) {
+    return ref<transfer_context>(id(pn_delivery_attachments(s), TRANSFER_CONTEXT));
+}
+
 }
diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp
index 4694ca969..02ea76083 100644
--- a/cpp/src/contexts.hpp
+++ b/cpp/src/contexts.hpp
@@ -103,6 +103,7 @@ class connection_context : public context {
     listener_context* listener_context_;
     work_queue work_queue_;
     std::string active_url_;
+    void* user_data_;
 };
 
 class reconnect_options_base;
@@ -127,6 +128,7 @@ class listener_context : public context {
 
     listen_handler* listen_handler_;
     std::unique_ptr<const connection_options> connection_options_;
+    void* user_data_;
 };
 
 class link_context : public context {
@@ -140,6 +142,7 @@ class link_context : public context {
     bool auto_accept;
     bool auto_settle;
     bool draining;
+    void* user_data_;
 };
 
 class session_context : public context {
@@ -148,6 +151,16 @@ class session_context : public context {
     static session_context& get(pn_session_t* s);
 
     messaging_handler* handler;
+    void* user_data_;
+};
+
+class transfer_context : public context {
+  public:
+    transfer_context() : handler(0) {}
+    static transfer_context& get(pn_delivery_t* s);
+
+    messaging_handler* handler;
+    void* user_data_;
 };
 
 }
diff --git a/cpp/src/link.cpp b/cpp/src/link.cpp
index c94fb4468..c6f58f48d 100644
--- a/cpp/src/link.cpp
+++ b/cpp/src/link.cpp
@@ -95,4 +95,17 @@ std::map<symbol, value> link::properties() const {
 error_condition link::error() const {
     return make_wrapper(pn_link_remote_condition(pn_object()));
 }
+
+void link::user_data(void* user_data) const {
+    pn_link_t* lnk = pn_object();
+    link_context& lctx = link_context::get(lnk);
+    lctx.user_data_ = user_data;
+}
+
+void* link::user_data() const {
+    pn_link_t* lnk = pn_object();
+    link_context& lctx = link_context::get(lnk);
+    return lctx.user_data_;
+}
+
 }
diff --git a/cpp/src/listener.cpp b/cpp/src/listener.cpp
index 931600aec..91d0350d7 100644
--- a/cpp/src/listener.cpp
+++ b/cpp/src/listener.cpp
@@ -60,6 +60,16 @@ class container& listener::container() const {
     return *reinterpret_cast<class container*>(c);
 }
 
+void listener::user_data(void* user_data) const {
+    listener_context& lc = listener_context::get(listener_);
+    lc.user_data_ = user_data;
+}
+
+void* listener::user_data() const {
+    listener_context& lc = listener_context::get(listener_);
+    return lc.user_data_;
+}
+
 // Listen handler
 listen_handler::~listen_handler() = default;
 void listen_handler::on_open(listener&) {}
diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp
index 7ba809b82..b8f777a00 100644
--- a/cpp/src/session.cpp
+++ b/cpp/src/session.cpp
@@ -136,4 +136,16 @@ session_iterator session_iterator::operator++() {
     return *this;
 }
 
+void session::user_data(void* user_data) const {
+    pn_session_t* ssn = pn_object();
+    session_context& sctx = session_context::get(ssn);
+    sctx.user_data_ = user_data;
+}
+
+void* session::user_data() const {
+    pn_session_t* ssn = pn_object();
+    session_context& sctx = session_context::get(ssn);
+    return sctx.user_data_;
+}
+
 } // namespace proton
diff --git a/cpp/src/transfer.cpp b/cpp/src/transfer.cpp
index a024abf61..063254267 100644
--- a/cpp/src/transfer.cpp
+++ b/cpp/src/transfer.cpp
@@ -49,4 +49,15 @@ enum transfer::state transfer::state() const { return static_cast<enum state>(pn
 
 std::string to_string(enum transfer::state s) { return pn_disposition_type_name(s); }
 std::ostream& operator<<(std::ostream& o, const enum transfer::state s) { return o << to_string(s); }
+
+void transfer::user_data(void* user_data) const {
+    transfer_context& cc = transfer_context::get(pn_object());
+    cc.user_data_ = user_data;
+}
+
+void* transfer::user_data() const {
+    transfer_context& cc = transfer_context::get(pn_object());
+    return cc.user_data_;
+}
+
 }
diff --git a/cpp/tests.cmake b/cpp/tests.cmake
index ee43b3c6e..fd515eabf 100644
--- a/cpp/tests.cmake
+++ b/cpp/tests.cmake
@@ -62,6 +62,7 @@ add_cpp_test(reconnect_test)
 add_cpp_test(link_test)
 add_cpp_test(credit_test)
 add_cpp_test(delivery_test)
+add_cpp_test(context_test)
 if (ENABLE_JSONCPP)
   add_cpp_test(connect_config_test)
   target_link_libraries(connect_config_test qpid-proton-core) # For pn_sasl_enabled


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org