You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2023/01/04 23:03:17 UTC
[qpid-proton] 03/03: PROTON-2666: Add support for message 'to' to C++ example broker
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 06956b155658ff3829bfe920f386caf5c7941c46
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Wed Jan 4 11:29:01 2023 -0500
PROTON-2666: Add support for message 'to' to C++ example broker
This amounts to supporting the ANONYMOUS-RELAY capability
---
cpp/examples/broker.cpp | 55 +++++++++++++++++++++++++++++++++++++++++--------
1 file changed, 46 insertions(+), 9 deletions(-)
diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp
index bcf8ab6ae..c09e0b0e0 100644
--- a/cpp/examples/broker.cpp
+++ b/cpp/examples/broker.cpp
@@ -225,20 +225,29 @@ void Sender::boundQueue(Queue* q, std::string qn) {
std::cout << "sending from " << queue_name_ << std::endl;
}
+class QueueManager;
+
class Receiver : public proton::messaging_handler {
friend class connection_handler;
proton::receiver receiver_;
proton::work_queue& work_queue_;
Queue* queue_;
+ QueueManager& queue_manager_;
std::deque<proton::message> messages_;
// A message is received.
- void on_message(proton::delivery &, proton::message &m) override {
- messages_.push_back(m);
-
+ void on_message(proton::delivery &d, proton::message &m) override {
+ // We allow anonymous relay behaviour always even if not requested
+ auto to_address = m.to();
if (queue_) {
+ messages_.push_back(m);
queueMsgs();
+ } else if (!to_address.empty()) {
+ queueMsgToNamedQueue(m, to_address);
+ } else {
+ // No bound link queue, no message 'to address' - reject message
+ d.reject();
}
}
@@ -251,9 +260,11 @@ class Receiver : public proton::messaging_handler {
}
}
+ void queueMsgToNamedQueue(proton::message& m, std::string address);
+
public:
- Receiver(proton::receiver r) :
- receiver_(r), work_queue_(r.work_queue()), queue_(0)
+ Receiver(proton::receiver r, QueueManager& qm) :
+ receiver_(r), work_queue_(r.work_queue()), queue_(0), queue_manager_(qm)
{}
bool add(proton::work f) {
@@ -297,7 +308,7 @@ public:
qn = os.str();
}
Queue* q = 0;
- queues::iterator i = queues_.find(qn);
+ auto i = queues_.find(qn);
if (i==queues_.end()) {
q = new Queue(container_, qn);
queues_[qn] = q;
@@ -307,6 +318,18 @@ public:
connection.add([=, &connection] {connection.boundQueue(q, qn);});
}
+ void queueMessage(proton::message m, std::string address) {
+ Queue* q = 0;
+ auto i = queues_.find(address);
+ if (i==queues_.end()) {
+ q = new Queue(container_, address);
+ queues_[address] = q;
+ } else {
+ q = i->second;
+ }
+ q->add([=] {q->queueMsg(m);});
+ }
+
void findQueueSender(Sender* s, std::string qn) {
findQueue(*s, qn);
}
@@ -316,6 +339,11 @@ public:
}
};
+void Receiver::queueMsgToNamedQueue(proton::message& m, std::string address) {
+ DOUT(std::cerr << "Receiver: " << this << " send msg to Queue: " << address << "\n";);
+ queue_manager_.add([=]{queue_manager_.queueMessage(m, address);});
+}
+
class connection_handler : public proton::messaging_handler {
QueueManager& queue_manager_;
senders senders_;
@@ -326,7 +354,10 @@ public:
{}
void on_connection_open(proton::connection& c) override {
- c.open(); // Accept the connection
+ // Don't check whether the peer desires ANONYMOUS-RELAY: offer it anyway.
+ // Accept the connection
+ c.open(proton::connection_options{}
+ .offered_capabilities({"ANONYMOUS-RELAY"}));
}
// A sender sends messages from a queue to a subscriber.
@@ -340,8 +371,14 @@ public:
// A receiver receives messages from a publisher to a queue.
void on_receiver_open(proton::receiver &receiver) override {
std::string qname = receiver.target().address();
- Receiver* r = new Receiver(receiver);
- queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);});
+ Receiver* r = new Receiver(receiver, queue_manager_);
+ // Allow anonymous relay always
+ if (qname.empty()) {
+ receiver.open(proton::receiver_options{}
+ .handler(*r));
+ } else {
+ queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);});
+ }
}
void on_session_close(proton::session &session) override {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org