You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2023/01/04 23:03:16 UTC
[qpid-proton] 02/03: PROTON-2665: Convert C++ broker example to use lambdas
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit a131afbc251821cdeacbba074133e9cd6dd51282
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Wed Jan 4 11:15:12 2023 -0500
PROTON-2665: Convert C++ broker example to use lambdas
We can make the code more idiomatic by using C++11 features.
---
cpp/examples/broker.cpp | 37 +++++++++++++++++++++++--------------
1 file changed, 23 insertions(+), 14 deletions(-)
diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp
index 30b602821..bcf8ab6ae 100644
--- a/cpp/examples/broker.cpp
+++ b/cpp/examples/broker.cpp
@@ -141,7 +141,9 @@ class Queue {
DOUT(std::cerr << "(" << current_->second << ") ";);
if (current_->second>0) {
DOUT(std::cerr << current_->first << " ";);
- current_->first->add(make_work(&Sender::sendMsg, current_->first, messages_.front()));
+ auto msg = messages_.front();
+ auto sender = current_->first;
+ sender->add([=]{sender->sendMsg(msg);});
messages_.pop_front();
--current_->second;
++current_;
@@ -180,14 +182,15 @@ public:
// If we're about to erase the current subscription move on
if (current_ != subscriptions_.end() && current_->first==s) ++current_;
subscriptions_.erase(s);
- s->add(make_work(&Sender::unsubscribed, s));
+ s->add([=]{s->unsubscribed();});
}
};
// We have credit to send a message.
void Sender::on_sendable(proton::sender &sender) {
if (queue_) {
- queue_->add(make_work(&Queue::flow, queue_, this, sender.credit()));
+ auto credit = sender.credit();
+ queue_->add([=]{queue_->flow(this, credit);});
} else {
pending_credit_ = sender.credit();
}
@@ -195,7 +198,7 @@ void Sender::on_sendable(proton::sender &sender) {
void Sender::on_sender_close(proton::sender &sender) {
if (queue_) {
- queue_->add(make_work(&Queue::unsubscribe, queue_, this));
+ queue_->add([=]{queue_->unsubscribe(this);});
} else {
// TODO: Is it possible to be closed before we get the queue allocated?
// If so, we should have a way to mark the sender deleted, so we can delete
@@ -209,13 +212,16 @@ void Sender::boundQueue(Queue* q, std::string qn) {
queue_ = q;
queue_name_ = qn;
- q->add(make_work(&Queue::subscribe, q, this));
sender_.open(proton::sender_options()
.source((proton::source_options().address(queue_name_)))
.handler(*this));
- if (pending_credit_>0) {
- queue_->add(make_work(&Queue::flow, queue_, this, pending_credit_));
- }
+ auto credit = pending_credit_;
+ q->add([=]{
+ q->subscribe(this);
+ if (credit>0) {
+ q->flow(this, credit);
+ }
+ });
std::cout << "sending from " << queue_name_ << std::endl;
}
@@ -239,7 +245,8 @@ class Receiver : public proton::messaging_handler {
void queueMsgs() {
DOUT(std::cerr << "Receiver: " << this << " queueing " << messages_.size() << " msgs to: " << queue_ << "\n";);
while (!messages_.empty()) {
- queue_->add(make_work(&Queue::queueMsg, queue_, messages_.front()));
+ auto msg = messages_.front();
+ queue_->add([=]{queue_->queueMsg(msg);});
messages_.pop_front();
}
}
@@ -297,7 +304,7 @@ public:
} else {
q = i->second;
}
- connection.add(make_work(&T::boundQueue, &connection, q, qn));
+ connection.add([=, &connection] {connection.boundQueue(q, qn);});
}
void findQueueSender(Sender* s, std::string qn) {
@@ -327,14 +334,14 @@ public:
std::string qn = sender.source().dynamic() ? "" : sender.source().address();
Sender* s = new Sender(sender, senders_);
senders_[sender] = s;
- queue_manager_.add(make_work(&QueueManager::findQueueSender, &queue_manager_, s, qn));
+ queue_manager_.add([=]{queue_manager_.findQueueSender(s, qn);});
}
// A receiver receives messages from a publisher to a queue.
void on_receiver_open(proton::receiver &receiver) override {
std::string qname = receiver.target().address();
Receiver* r = new Receiver(receiver);
- queue_manager_.add(make_work(&QueueManager::findQueueReceiver, &queue_manager_, r, qname));
+ queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);});
}
void on_session_close(proton::session &session) override {
@@ -344,7 +351,8 @@ public:
if (j == senders_.end()) continue;
Sender* s = j->second;
if (s->queue_) {
- s->queue_->add(make_work(&Queue::unsubscribe, s->queue_, s));
+ auto q = s->queue_;
+ s->queue_->add([=]{q->unsubscribe(s);});
}
senders_.erase(j);
}
@@ -362,7 +370,8 @@ public:
if (j == senders_.end()) continue;
Sender* s = j->second;
if (s->queue_) {
- s->queue_->add(make_work(&Queue::unsubscribe, s->queue_, s));
+ auto q = s->queue_;
+ s->queue_->add([=]{q->unsubscribe(s);});
}
}
delete this; // All done.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org