You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2023/01/04 23:03:17 UTC

[qpid-proton] 03/03: PROTON-2666: Add support for message 'to' to C++ example broker

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 06956b155658ff3829bfe920f386caf5c7941c46
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Wed Jan 4 11:29:01 2023 -0500

    PROTON-2666: Add support for message 'to' to C++ example broker
    
    This amounts to supporting the ANONYMOUS-RELAY capability
---
 cpp/examples/broker.cpp | 55 +++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 46 insertions(+), 9 deletions(-)

diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp
index bcf8ab6ae..c09e0b0e0 100644
--- a/cpp/examples/broker.cpp
+++ b/cpp/examples/broker.cpp
@@ -225,20 +225,29 @@ void Sender::boundQueue(Queue* q, std::string qn) {
     std::cout << "sending from " << queue_name_ << std::endl;
 }
 
+class QueueManager;
+
 class Receiver : public proton::messaging_handler {
     friend class connection_handler;
 
     proton::receiver receiver_;
     proton::work_queue& work_queue_;
     Queue* queue_;
+    QueueManager& queue_manager_;
     std::deque<proton::message> messages_;
 
     // A message is received.
-    void on_message(proton::delivery &, proton::message &m) override {
-        messages_.push_back(m);
-
+    void on_message(proton::delivery &d, proton::message &m) override {
+        // We allow anonymous relay behaviour always even if not requested
+        auto to_address = m.to();
         if (queue_) {
+            messages_.push_back(m);
             queueMsgs();
+        } else if (!to_address.empty()) {
+            queueMsgToNamedQueue(m, to_address);
+        } else {
+            // No bound link queue, no message 'to address' - reject message
+            d.reject();
         }
     }
 
@@ -251,9 +260,11 @@ class Receiver : public proton::messaging_handler {
         }
     }
 
+    void queueMsgToNamedQueue(proton::message& m, std::string address);
+
 public:
-    Receiver(proton::receiver r) :
-        receiver_(r), work_queue_(r.work_queue()), queue_(0)
+    Receiver(proton::receiver r, QueueManager& qm) :
+        receiver_(r), work_queue_(r.work_queue()), queue_(0), queue_manager_(qm)
     {}
 
     bool add(proton::work f) {
@@ -297,7 +308,7 @@ public:
             qn = os.str();
         }
         Queue* q = 0;
-        queues::iterator i = queues_.find(qn);
+        auto i = queues_.find(qn);
         if (i==queues_.end()) {
             q = new Queue(container_, qn);
             queues_[qn] = q;
@@ -307,6 +318,18 @@ public:
         connection.add([=, &connection] {connection.boundQueue(q, qn);});
     }
 
+    void queueMessage(proton::message m, std::string address) {
+        Queue* q = 0;
+        auto i = queues_.find(address);
+        if (i==queues_.end()) {
+            q = new Queue(container_, address);
+            queues_[address] = q;
+        } else {
+            q = i->second;
+        }
+        q->add([=] {q->queueMsg(m);});
+    }
+
     void findQueueSender(Sender* s, std::string qn) {
         findQueue(*s, qn);
     }
@@ -316,6 +339,11 @@ public:
     }
 };
 
+void Receiver::queueMsgToNamedQueue(proton::message& m, std::string address) {
+    DOUT(std::cerr << "Receiver: " << this << " send msg to Queue: " << address << "\n";);
+    queue_manager_.add([=]{queue_manager_.queueMessage(m, address);});
+}
+
 class connection_handler : public proton::messaging_handler {
     QueueManager& queue_manager_;
     senders senders_;
@@ -326,7 +354,10 @@ public:
     {}
 
     void on_connection_open(proton::connection& c) override {
-        c.open();            // Accept the connection
+        // Don't check whether the peer desires ANONYMOUS-RELAY: offer it anyway.
+        // Accept the connection
+        c.open(proton::connection_options{}
+            .offered_capabilities({"ANONYMOUS-RELAY"}));
     }
 
     // A sender sends messages from a queue to a subscriber.
@@ -340,8 +371,14 @@ public:
     // A receiver receives messages from a publisher to a queue.
     void on_receiver_open(proton::receiver &receiver) override {
         std::string qname = receiver.target().address();
-        Receiver* r = new Receiver(receiver);
-        queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);});
+        Receiver* r = new Receiver(receiver, queue_manager_);
+        // Allow anonymous relay always
+        if (qname.empty()) {
+            receiver.open(proton::receiver_options{}
+                .handler(*r));
+        } else {
+           queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);});
+        }
     }
 
     void on_session_close(proton::session &session) override {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org