You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2017/05/12 22:35:12 UTC

qpid-proton git commit: PROTON-1479: Fix scheduled_send examples to send and close in a safe context

Repository: qpid-proton
Updated Branches:
  refs/heads/master 2b5cfc817 -> a22b17896


PROTON-1479: Fix scheduled_send examples to send and close in a safe context


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a22b1789
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a22b1789
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a22b1789

Branch: refs/heads/master
Commit: a22b17896ab0bd0a065b1718a5bbbedc1eeca229
Parents: 2b5cfc8
Author: Andrew Stitcher <as...@apache.org>
Authored: Wed May 3 23:42:56 2017 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Fri May 12 18:34:42 2017 -0400

----------------------------------------------------------------------
 examples/cpp/scheduled_send.cpp    | 11 ++++--
 examples/cpp/scheduled_send_03.cpp | 62 +++++++++++++++++++++++----------
 2 files changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a22b1789/examples/cpp/scheduled_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/scheduled_send.cpp b/examples/cpp/scheduled_send.cpp
index ef6cd27..de04c3b 100644
--- a/examples/cpp/scheduled_send.cpp
+++ b/examples/cpp/scheduled_send.cpp
@@ -23,6 +23,7 @@
 
 #include <proton/container.hpp>
 #include <proton/default_container.hpp>
+#include <proton/event_loop.hpp>
 #include <proton/message.hpp>
 #include <proton/messaging_handler.hpp>
 #include <proton/sender.hpp>
@@ -39,6 +40,7 @@ class scheduled_sender : public proton::messaging_handler {
     std::string url;
     proton::sender sender;
     proton::duration interval, timeout;
+    proton::event_loop* event_loop;
     bool ready, canceled;
 
   public:
@@ -51,12 +53,15 @@ class scheduled_sender : public proton::messaging_handler {
         canceled(false)         // Canceled.
     {}
 
+    // The awkward looking double lambda is necessary because the scheduled lambdas run in the container context
+    // and must arrange lambdas for send and close to happen in the connection context.
     void on_container_start(proton::container &c) OVERRIDE {
         sender = c.open_sender(url);
+        event_loop = &proton::make_thread_safe(sender).get()->event_loop();
         // Call this->cancel after timeout.
-        c.schedule(timeout, [this]() { this->cancel(); });
+        c.schedule(timeout, [this]() { this->event_loop->inject( [this]() { this->cancel(); }); });
          // Start regular ticks every interval.
-        c.schedule(interval, [this]() { this->tick(); });
+        c.schedule(interval, [this]() { this->event_loop->inject( [this]() { this->tick(); }); });
     }
 
     void cancel() {
@@ -67,7 +72,7 @@ class scheduled_sender : public proton::messaging_handler {
     void tick() {
         // Schedule the next tick unless we have been cancelled.
         if (!canceled)
-            sender.container().schedule(interval, [this]() { this->tick(); });
+            sender.container().schedule(interval, [this]() { this->event_loop->inject( [this]() { this->tick(); }); });
         if (sender.credit() > 0) // Only send if we have credit
             send();
         else

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a22b1789/examples/cpp/scheduled_send_03.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/scheduled_send_03.cpp b/examples/cpp/scheduled_send_03.cpp
index 92e5767..d106d29 100644
--- a/examples/cpp/scheduled_send_03.cpp
+++ b/examples/cpp/scheduled_send_03.cpp
@@ -25,6 +25,7 @@
 #include <proton/connection.hpp>
 #include <proton/default_container.hpp>
 #include <proton/duration.hpp>
+#include <proton/event_loop.hpp>
 #include <proton/function.hpp>
 #include <proton/message.hpp>
 #include <proton/messaging_handler.hpp>
@@ -41,24 +42,42 @@
 class scheduled_sender : public proton::messaging_handler {
   private:
     std::string url;
-    proton::sender sender;
     proton::duration interval, timeout;
+    proton::event_loop *event_loop;
     bool ready, canceled;
 
+    struct cancel_fn : public proton::void_function0 {
+        scheduled_sender* parent;
+        proton::sender sender;
+        cancel_fn(): parent(0) {}
+        cancel_fn(scheduled_sender& ss, proton::sender& s) : parent(&ss), sender(s) {}
+        void operator()() { if (parent) parent->cancel(sender); }
+    };
+
     struct tick_fn : public proton::void_function0 {
+        scheduled_sender* parent;
+        proton::sender sender;
+        tick_fn(): parent(0) {}
+        tick_fn(scheduled_sender& ss, proton::sender& s) : parent(&ss), sender(s) {}
+        void operator()() { if (parent) parent->tick(sender); }
+    };
+
+    struct defer_cancel_fn : public proton::void_function0 {
         scheduled_sender& parent;
-        tick_fn(scheduled_sender& ss) : parent(ss) {}
-        void operator()() { parent.tick(); }
+        defer_cancel_fn(scheduled_sender& ss) : parent(ss) {}
+        void operator()() { parent.event_loop->inject(parent.do_cancel); }
     };
 
-    struct cancel_fn : public proton::void_function0 {
+    struct defer_tick_fn : public proton::void_function0 {
         scheduled_sender& parent;
-        cancel_fn(scheduled_sender& ss) : parent(ss) {}
-        void operator()() { parent.cancel(); }
+        defer_tick_fn(scheduled_sender& ss) : parent(ss) {}
+        void operator()() { parent.event_loop->inject(parent.do_tick); }
     };
 
     tick_fn do_tick;
     cancel_fn do_cancel;
+    defer_tick_fn defer_tick;
+    defer_cancel_fn defer_cancel;
 
   public:
 
@@ -68,37 +87,44 @@ class scheduled_sender : public proton::messaging_handler {
         timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout.
         ready(true),            // Ready to send.
         canceled(false),         // Canceled.
-        do_tick(*this),
-        do_cancel(*this)
+        defer_tick(*this),
+        defer_cancel(*this)
     {}
 
     void on_container_start(proton::container &c) OVERRIDE {
-        sender = c.open_sender(url);
-        c.schedule(timeout, do_cancel); // Call this->cancel after timeout.
-        c.schedule(interval, do_tick); // Start regular ticks every interval.
+        c.open_sender(url);
+    }
+
+    void on_sender_open(proton::sender & s) OVERRIDE {
+        event_loop = &proton::make_thread_safe(s).get()->event_loop();
+
+        do_cancel = cancel_fn(*this, s);
+        do_tick = tick_fn(*this, s);
+        s.container().schedule(timeout, defer_cancel); // Call this->cancel after timeout.
+        s.container().schedule(interval, defer_tick); // Start regular ticks every interval.
     }
 
-    void cancel() {
+    void cancel(proton::sender& sender) {
         canceled = true;
         sender.connection().close();
     }
 
-    void tick() {
+    void tick(proton::sender& sender) {
         if (!canceled) {
-            sender.container().schedule(interval, do_tick); // Next tick
+            sender.container().schedule(interval, defer_tick); // Next tick
             if (sender.credit() > 0) // Only send if we have credit
-                send();
+                send(sender);
             else
                 ready = true; // Set the ready flag, send as soon as we get credit.
         }
     }
 
-    void on_sendable(proton::sender &) OVERRIDE {
+    void on_sendable(proton::sender &sender) OVERRIDE {
         if (ready)              // We have been ticked since the last send.
-            send();
+            send(sender);
     }
 
-    void send() {
+    void send(proton::sender& sender) {
         std::cout << "send" << std::endl;
         sender.send(proton::message("ping"));
         ready = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org