You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/04/11 17:07:18 UTC

[qpid-proton] 12/15: PROTON-2006: [C++] Fixed up service bus example to work - Connection options needed fixing -- You can't put user/password in the connection url anymore -- Need to turn SSL on and ensure SASL mech is PLAIN - Misuse of scheduling stopped the application exiting when finished

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch 0.27.x
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 37e6a68de7dce46dcec13ca00415bb169934b747
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Tue Mar 26 16:26:26 2019 -0400

    PROTON-2006: [C++] Fixed up service bus example to work
    - Connection options needed fixing
    -- You can't put user/password in the connection url anymore
    -- Need to turn SSL on and ensure SASL mech is PLAIN
    - Misuse of scheduling stopped the application exiting when finished
    
    (cherry picked from commit eab1fef19b4e58a8b79408f29e10fe36b07a3b83)
---
 cpp/examples/service_bus.cpp | 39 +++++++++++++++++++++++++--------------
 1 file changed, 25 insertions(+), 14 deletions(-)

diff --git a/cpp/examples/service_bus.cpp b/cpp/examples/service_bus.cpp
index c99bca6..7f3052a 100644
--- a/cpp/examples/service_bus.cpp
+++ b/cpp/examples/service_bus.cpp
@@ -89,6 +89,7 @@ Done. No more messages.
 #include <proton/message.hpp>
 #include <proton/messaging_handler.hpp>
 #include <proton/receiver_options.hpp>
+#include <proton/ssl.hpp>
 #include <proton/sender.hpp>
 #include <proton/sender_options.hpp>
 #include <proton/source_options.hpp>
@@ -104,6 +105,7 @@ using proton::source_options;
 using proton::connection_options;
 using proton::sender_options;
 using proton::receiver_options;
+using proton::ssl_client_options;
 
 void do_next_sequence();
 
@@ -119,6 +121,7 @@ class session_receiver : public proton::messaging_handler {
   private:
     const std::string &connection_url;
     const std::string &entity;
+    connection_options coptions;
     proton::value session_identifier; // AMQP null type by default, matches any Service Bus sequence identifier
     int message_count;
     bool closed;
@@ -128,8 +131,10 @@ class session_receiver : public proton::messaging_handler {
     proton::receiver receiver;
 
   public:
-    session_receiver(const std::string &c, const std::string &e,
-                     const char *sid) : connection_url(c), entity(e), message_count(0), closed(false), read_timeout(5000), last_read(0), container(0) {
+    session_receiver(const std::string &c, const std::string &e, const connection_options &co,
+                     const char *sid) :
+        connection_url(c), entity(e), coptions(co),
+        message_count(0), closed(false), read_timeout(5000), last_read(0), container(0) {
         if (sid)
             session_identifier = std::string(sid);
         // session_identifier is now either empty/null or an AMQP string type.
@@ -143,7 +148,7 @@ class session_receiver : public proton::messaging_handler {
     void run (proton::container &c) {
         message_count = 0;
         closed = false;
-        c.connect(connection_url, connection_options().handler(*this));
+        c.connect(connection_url, coptions.handler(*this));
         container = &c;
     }
 
@@ -159,7 +164,7 @@ class session_receiver : public proton::messaging_handler {
         // identifier if none was specified).
         last_read = proton::timestamp::now();
         // Call this->process_timeout after read_timeout.
-        container->schedule(read_timeout, [this]() { this->process_timeout(); });
+        connection.work_queue().schedule(read_timeout, [this]() { this->process_timeout(); });
     }
 
     void on_receiver_open(proton::receiver &r) OVERRIDE {
@@ -189,7 +194,7 @@ class session_receiver : public proton::messaging_handler {
                 std::cout << "Done. No more messages." << std::endl;
         } else {
             proton::duration next = deadline - now;
-            container->schedule(next, [this]() { this->process_timeout(); });
+            receiver.work_queue().schedule(next, [this]() { this->process_timeout(); });
         }
     }
 };
@@ -200,16 +205,18 @@ class session_sender : public proton::messaging_handler {
   private:
     const std::string &connection_url;
     const std::string &entity;
+    connection_options coptions;
     int msg_count;
     int total;
     int accepts;
 
   public:
-    session_sender(const std::string &c, const std::string &e) : connection_url(c), entity(e),
-                                                                 msg_count(0), total(7), accepts(0) {}
+    session_sender(const std::string &c, const std::string &e, const connection_options &co) :
+        connection_url(c), entity(e), coptions(co),
+        msg_count(0), total(7), accepts(0) {}
 
     void run(proton::container &c) {
-        c.open_sender(connection_url + "/" + entity, sender_options(), connection_options().handler(*this));
+        c.open_sender(connection_url + "/" + entity, sender_options(), coptions.handler(*this));
     }
 
     void send_remaining_messages(proton::sender &s) {
@@ -261,9 +268,9 @@ class sequence : public proton::messaging_handler {
   public:
     static sequence *the_sequence;
 
-    sequence (const std::string &c, const std::string &e) :
+    sequence (const std::string &c, const std::string &e, const connection_options &co) :
         container(0), sequence_no(0),
-        snd(c, e), rcv_red(c, e, "red"), rcv_green(c, e, "green"), rcv_null(c, e, NULL) {
+        snd(c, e, co), rcv_red(c, e, co, "red"), rcv_green(c, e, co, "green"), rcv_null(c, e, co, NULL) {
         the_sequence = this;
     }
 
@@ -291,7 +298,6 @@ void do_next_sequence() { sequence::the_sequence->next_sequence(); }
 
 int main(int argc, char **argv) {
     std::string sb_namespace; // i.e. "foo.servicebus.windows.net"
-    // Make sure the next two are urlencoded for Proton
     std::string sb_key_name;  // shared access key name for entity (AKA "Policy Name")
     std::string sb_key;       // shared access key
     std::string sb_entity;    // AKA the service bus queue.  Must enable
@@ -309,9 +315,14 @@ int main(int argc, char **argv) {
         check_arg(sb_key_name, "policy");
         check_arg(sb_key, "key");
         check_arg(sb_entity, "entity");
-        std::string connection_string("amqps://" + sb_key_name + ":" + sb_key + "@" + sb_namespace);
-
-        sequence seq(connection_string, sb_entity);
+        std::string connection_string("amqps://" + sb_namespace);
+
+        sequence seq(connection_string, sb_entity,
+                     connection_options()
+                     .user(sb_key_name)
+                     .password(sb_key)
+                     .ssl_client_options(ssl_client_options())
+                     .sasl_allowed_mechs("PLAIN"));
         proton::container(seq).run();
         return 0;
     } catch (const std::exception& e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org