You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2023/01/04 23:03:14 UTC

[qpid-proton] branch main updated (c76823d8b -> 06956b155)

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


    from c76823d8b PROTON-2654: [cpp] Tests for newly added map constructors
     new b4bb7a26d PROTON-2664: Frame dumps truncated if there is a zero length string
     new a131afbc2 PROTON-2665: Convert C++ broker example to use lambdas
     new 06956b155 PROTON-2666: Add support for message 'to' to C++ example broker

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 c/src/core/fixed_string.h |  2 +-
 cpp/examples/broker.cpp   | 90 +++++++++++++++++++++++++++++++++++------------
 2 files changed, 69 insertions(+), 23 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 02/03: PROTON-2665: Convert C++ broker example to use lambdas

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit a131afbc251821cdeacbba074133e9cd6dd51282
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Wed Jan 4 11:15:12 2023 -0500

    PROTON-2665: Convert C++ broker example to use lambdas
    
    We can make the code more idiomatic by using C++11 features.
---
 cpp/examples/broker.cpp | 37 +++++++++++++++++++++++--------------
 1 file changed, 23 insertions(+), 14 deletions(-)

diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp
index 30b602821..bcf8ab6ae 100644
--- a/cpp/examples/broker.cpp
+++ b/cpp/examples/broker.cpp
@@ -141,7 +141,9 @@ class Queue {
             DOUT(std::cerr << "(" << current_->second << ") ";);
             if (current_->second>0) {
                 DOUT(std::cerr << current_->first << " ";);
-                current_->first->add(make_work(&Sender::sendMsg, current_->first, messages_.front()));
+                auto msg = messages_.front();
+                auto sender = current_->first;
+                sender->add([=]{sender->sendMsg(msg);});
                 messages_.pop_front();
                 --current_->second;
                 ++current_;
@@ -180,14 +182,15 @@ public:
         // If we're about to erase the current subscription move on
         if (current_ != subscriptions_.end() && current_->first==s) ++current_;
         subscriptions_.erase(s);
-        s->add(make_work(&Sender::unsubscribed, s));
+        s->add([=]{s->unsubscribed();});
     }
 };
 
 // We have credit to send a message.
 void Sender::on_sendable(proton::sender &sender) {
     if (queue_) {
-        queue_->add(make_work(&Queue::flow, queue_, this, sender.credit()));
+        auto credit = sender.credit();
+        queue_->add([=]{queue_->flow(this, credit);});
     } else {
         pending_credit_ = sender.credit();
     }
@@ -195,7 +198,7 @@ void Sender::on_sendable(proton::sender &sender) {
 
 void Sender::on_sender_close(proton::sender &sender) {
     if (queue_) {
-        queue_->add(make_work(&Queue::unsubscribe, queue_, this));
+        queue_->add([=]{queue_->unsubscribe(this);});
     } else {
         // TODO: Is it possible to be closed before we get the queue allocated?
         // If so, we should have a way to mark the sender deleted, so we can delete
@@ -209,13 +212,16 @@ void Sender::boundQueue(Queue* q, std::string qn) {
     queue_ = q;
     queue_name_ = qn;
 
-    q->add(make_work(&Queue::subscribe, q, this));
     sender_.open(proton::sender_options()
         .source((proton::source_options().address(queue_name_)))
         .handler(*this));
-    if (pending_credit_>0) {
-        queue_->add(make_work(&Queue::flow, queue_, this, pending_credit_));
-    }
+    auto credit = pending_credit_;
+    q->add([=]{
+        q->subscribe(this);
+        if (credit>0) {
+            q->flow(this, credit);
+        }
+    });
     std::cout << "sending from " << queue_name_ << std::endl;
 }
 
@@ -239,7 +245,8 @@ class Receiver : public proton::messaging_handler {
     void queueMsgs() {
         DOUT(std::cerr << "Receiver: " << this << " queueing " << messages_.size() << " msgs to: " << queue_ << "\n";);
         while (!messages_.empty()) {
-            queue_->add(make_work(&Queue::queueMsg, queue_, messages_.front()));
+            auto msg = messages_.front();
+            queue_->add([=]{queue_->queueMsg(msg);});
             messages_.pop_front();
         }
     }
@@ -297,7 +304,7 @@ public:
         } else {
             q = i->second;
         }
-        connection.add(make_work(&T::boundQueue, &connection, q, qn));
+        connection.add([=, &connection] {connection.boundQueue(q, qn);});
     }
 
     void findQueueSender(Sender* s, std::string qn) {
@@ -327,14 +334,14 @@ public:
         std::string qn = sender.source().dynamic() ? "" : sender.source().address();
         Sender* s = new Sender(sender, senders_);
         senders_[sender] = s;
-        queue_manager_.add(make_work(&QueueManager::findQueueSender, &queue_manager_, s, qn));
+        queue_manager_.add([=]{queue_manager_.findQueueSender(s, qn);});
     }
 
     // A receiver receives messages from a publisher to a queue.
     void on_receiver_open(proton::receiver &receiver) override {
         std::string qname = receiver.target().address();
         Receiver* r = new Receiver(receiver);
-        queue_manager_.add(make_work(&QueueManager::findQueueReceiver, &queue_manager_, r, qname));
+        queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);});
     }
 
     void on_session_close(proton::session &session) override {
@@ -344,7 +351,8 @@ public:
             if (j == senders_.end()) continue;
             Sender* s = j->second;
             if (s->queue_) {
-                s->queue_->add(make_work(&Queue::unsubscribe, s->queue_, s));
+                auto q = s->queue_;
+                s->queue_->add([=]{q->unsubscribe(s);});
             }
             senders_.erase(j);
         }
@@ -362,7 +370,8 @@ public:
             if (j == senders_.end()) continue;
             Sender* s = j->second;
             if (s->queue_) {
-                s->queue_->add(make_work(&Queue::unsubscribe, s->queue_, s));
+                auto q = s->queue_;
+                s->queue_->add([=]{q->unsubscribe(s);});
             }
         }
         delete this;            // All done.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 01/03: PROTON-2664: Frame dumps truncated if there is a zero length string

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit b4bb7a26d221ed235178f83905c73c65f24080e1
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Wed Jan 4 17:33:48 2023 -0500

    PROTON-2664: Frame dumps truncated if there is a zero length string
    
    We were treating 0 length strings as if there was an overflow and
    leaving the string terminated in the middle.
---
 c/src/core/fixed_string.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/c/src/core/fixed_string.h b/c/src/core/fixed_string.h
index 6df7be761..e60c3102a 100644
--- a/c/src/core/fixed_string.h
+++ b/c/src/core/fixed_string.h
@@ -82,7 +82,7 @@ static inline void pn_fixed_string_quote(pn_fixed_string_t *str, const char *dat
   char *out = &str->bytes[str->position];
   ssize_t out_size = pn_quote_data(out, bytes_left, data, size);
   // The only error (ie value less than 0) that can come from pn_quote_data is PN_OVERFLOW
-  if ( out_size>0 ) {
+  if ( out_size>=0 ) {
     str->position += out_size;
   } else {
     str->position = str->size;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 03/03: PROTON-2666: Add support for message 'to' to C++ example broker

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 06956b155658ff3829bfe920f386caf5c7941c46
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Wed Jan 4 11:29:01 2023 -0500

    PROTON-2666: Add support for message 'to' to C++ example broker
    
    This amounts to supporting the ANONYMOUS-RELAY capability
---
 cpp/examples/broker.cpp | 55 +++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 46 insertions(+), 9 deletions(-)

diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp
index bcf8ab6ae..c09e0b0e0 100644
--- a/cpp/examples/broker.cpp
+++ b/cpp/examples/broker.cpp
@@ -225,20 +225,29 @@ void Sender::boundQueue(Queue* q, std::string qn) {
     std::cout << "sending from " << queue_name_ << std::endl;
 }
 
+class QueueManager;
+
 class Receiver : public proton::messaging_handler {
     friend class connection_handler;
 
     proton::receiver receiver_;
     proton::work_queue& work_queue_;
     Queue* queue_;
+    QueueManager& queue_manager_;
     std::deque<proton::message> messages_;
 
     // A message is received.
-    void on_message(proton::delivery &, proton::message &m) override {
-        messages_.push_back(m);
-
+    void on_message(proton::delivery &d, proton::message &m) override {
+        // We allow anonymous relay behaviour always even if not requested
+        auto to_address = m.to();
         if (queue_) {
+            messages_.push_back(m);
             queueMsgs();
+        } else if (!to_address.empty()) {
+            queueMsgToNamedQueue(m, to_address);
+        } else {
+            // No bound link queue, no message 'to address' - reject message
+            d.reject();
         }
     }
 
@@ -251,9 +260,11 @@ class Receiver : public proton::messaging_handler {
         }
     }
 
+    void queueMsgToNamedQueue(proton::message& m, std::string address);
+
 public:
-    Receiver(proton::receiver r) :
-        receiver_(r), work_queue_(r.work_queue()), queue_(0)
+    Receiver(proton::receiver r, QueueManager& qm) :
+        receiver_(r), work_queue_(r.work_queue()), queue_(0), queue_manager_(qm)
     {}
 
     bool add(proton::work f) {
@@ -297,7 +308,7 @@ public:
             qn = os.str();
         }
         Queue* q = 0;
-        queues::iterator i = queues_.find(qn);
+        auto i = queues_.find(qn);
         if (i==queues_.end()) {
             q = new Queue(container_, qn);
             queues_[qn] = q;
@@ -307,6 +318,18 @@ public:
         connection.add([=, &connection] {connection.boundQueue(q, qn);});
     }
 
+    void queueMessage(proton::message m, std::string address) {
+        Queue* q = 0;
+        auto i = queues_.find(address);
+        if (i==queues_.end()) {
+            q = new Queue(container_, address);
+            queues_[address] = q;
+        } else {
+            q = i->second;
+        }
+        q->add([=] {q->queueMsg(m);});
+    }
+
     void findQueueSender(Sender* s, std::string qn) {
         findQueue(*s, qn);
     }
@@ -316,6 +339,11 @@ public:
     }
 };
 
+void Receiver::queueMsgToNamedQueue(proton::message& m, std::string address) {
+    DOUT(std::cerr << "Receiver: " << this << " send msg to Queue: " << address << "\n";);
+    queue_manager_.add([=]{queue_manager_.queueMessage(m, address);});
+}
+
 class connection_handler : public proton::messaging_handler {
     QueueManager& queue_manager_;
     senders senders_;
@@ -326,7 +354,10 @@ public:
     {}
 
     void on_connection_open(proton::connection& c) override {
-        c.open();            // Accept the connection
+        // Don't check whether the peer desires ANONYMOUS-RELAY: offer it anyway.
+        // Accept the connection
+        c.open(proton::connection_options{}
+            .offered_capabilities({"ANONYMOUS-RELAY"}));
     }
 
     // A sender sends messages from a queue to a subscriber.
@@ -340,8 +371,14 @@ public:
     // A receiver receives messages from a publisher to a queue.
     void on_receiver_open(proton::receiver &receiver) override {
         std::string qname = receiver.target().address();
-        Receiver* r = new Receiver(receiver);
-        queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);});
+        Receiver* r = new Receiver(receiver, queue_manager_);
+        // Allow anonymous relay always
+        if (qname.empty()) {
+            receiver.open(proton::receiver_options{}
+                .handler(*r));
+        } else {
+           queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);});
+        }
     }
 
     void on_session_close(proton::session &session) override {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org