You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2017/05/12 22:35:12 UTC
qpid-proton git commit: PROTON-1479: Fix scheduled_send examples to
send and close in a safe context
Repository: qpid-proton
Updated Branches:
refs/heads/master 2b5cfc817 -> a22b17896
PROTON-1479: Fix scheduled_send examples to send and close in a safe context
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a22b1789
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a22b1789
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a22b1789
Branch: refs/heads/master
Commit: a22b17896ab0bd0a065b1718a5bbbedc1eeca229
Parents: 2b5cfc8
Author: Andrew Stitcher <as...@apache.org>
Authored: Wed May 3 23:42:56 2017 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Fri May 12 18:34:42 2017 -0400
----------------------------------------------------------------------
examples/cpp/scheduled_send.cpp | 11 ++++--
examples/cpp/scheduled_send_03.cpp | 62 +++++++++++++++++++++++----------
2 files changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a22b1789/examples/cpp/scheduled_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/scheduled_send.cpp b/examples/cpp/scheduled_send.cpp
index ef6cd27..de04c3b 100644
--- a/examples/cpp/scheduled_send.cpp
+++ b/examples/cpp/scheduled_send.cpp
@@ -23,6 +23,7 @@
#include <proton/container.hpp>
#include <proton/default_container.hpp>
+#include <proton/event_loop.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/sender.hpp>
@@ -39,6 +40,7 @@ class scheduled_sender : public proton::messaging_handler {
std::string url;
proton::sender sender;
proton::duration interval, timeout;
+ proton::event_loop* event_loop;
bool ready, canceled;
public:
@@ -51,12 +53,15 @@ class scheduled_sender : public proton::messaging_handler {
canceled(false) // Canceled.
{}
+ // The awkward looking double lambda is necessary because the scheduled lambdas run in the container context
+ // and must arrange lambdas for send and close to happen in the connection context.
void on_container_start(proton::container &c) OVERRIDE {
sender = c.open_sender(url);
+ event_loop = &proton::make_thread_safe(sender).get()->event_loop();
// Call this->cancel after timeout.
- c.schedule(timeout, [this]() { this->cancel(); });
+ c.schedule(timeout, [this]() { this->event_loop->inject( [this]() { this->cancel(); }); });
// Start regular ticks every interval.
- c.schedule(interval, [this]() { this->tick(); });
+ c.schedule(interval, [this]() { this->event_loop->inject( [this]() { this->tick(); }); });
}
void cancel() {
@@ -67,7 +72,7 @@ class scheduled_sender : public proton::messaging_handler {
void tick() {
// Schedule the next tick unless we have been cancelled.
if (!canceled)
- sender.container().schedule(interval, [this]() { this->tick(); });
+ sender.container().schedule(interval, [this]() { this->event_loop->inject( [this]() { this->tick(); }); });
if (sender.credit() > 0) // Only send if we have credit
send();
else
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a22b1789/examples/cpp/scheduled_send_03.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/scheduled_send_03.cpp b/examples/cpp/scheduled_send_03.cpp
index 92e5767..d106d29 100644
--- a/examples/cpp/scheduled_send_03.cpp
+++ b/examples/cpp/scheduled_send_03.cpp
@@ -25,6 +25,7 @@
#include <proton/connection.hpp>
#include <proton/default_container.hpp>
#include <proton/duration.hpp>
+#include <proton/event_loop.hpp>
#include <proton/function.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
@@ -41,24 +42,42 @@
class scheduled_sender : public proton::messaging_handler {
private:
std::string url;
- proton::sender sender;
proton::duration interval, timeout;
+ proton::event_loop *event_loop;
bool ready, canceled;
+ struct cancel_fn : public proton::void_function0 {
+ scheduled_sender* parent;
+ proton::sender sender;
+ cancel_fn(): parent(0) {}
+ cancel_fn(scheduled_sender& ss, proton::sender& s) : parent(&ss), sender(s) {}
+ void operator()() { if (parent) parent->cancel(sender); }
+ };
+
struct tick_fn : public proton::void_function0 {
+ scheduled_sender* parent;
+ proton::sender sender;
+ tick_fn(): parent(0) {}
+ tick_fn(scheduled_sender& ss, proton::sender& s) : parent(&ss), sender(s) {}
+ void operator()() { if (parent) parent->tick(sender); }
+ };
+
+ struct defer_cancel_fn : public proton::void_function0 {
scheduled_sender& parent;
- tick_fn(scheduled_sender& ss) : parent(ss) {}
- void operator()() { parent.tick(); }
+ defer_cancel_fn(scheduled_sender& ss) : parent(ss) {}
+ void operator()() { parent.event_loop->inject(parent.do_cancel); }
};
- struct cancel_fn : public proton::void_function0 {
+ struct defer_tick_fn : public proton::void_function0 {
scheduled_sender& parent;
- cancel_fn(scheduled_sender& ss) : parent(ss) {}
- void operator()() { parent.cancel(); }
+ defer_tick_fn(scheduled_sender& ss) : parent(ss) {}
+ void operator()() { parent.event_loop->inject(parent.do_tick); }
};
tick_fn do_tick;
cancel_fn do_cancel;
+ defer_tick_fn defer_tick;
+ defer_cancel_fn defer_cancel;
public:
@@ -68,37 +87,44 @@ class scheduled_sender : public proton::messaging_handler {
timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout.
ready(true), // Ready to send.
canceled(false), // Canceled.
- do_tick(*this),
- do_cancel(*this)
+ defer_tick(*this),
+ defer_cancel(*this)
{}
void on_container_start(proton::container &c) OVERRIDE {
- sender = c.open_sender(url);
- c.schedule(timeout, do_cancel); // Call this->cancel after timeout.
- c.schedule(interval, do_tick); // Start regular ticks every interval.
+ c.open_sender(url);
+ }
+
+ void on_sender_open(proton::sender & s) OVERRIDE {
+ event_loop = &proton::make_thread_safe(s).get()->event_loop();
+
+ do_cancel = cancel_fn(*this, s);
+ do_tick = tick_fn(*this, s);
+ s.container().schedule(timeout, defer_cancel); // Call this->cancel after timeout.
+ s.container().schedule(interval, defer_tick); // Start regular ticks every interval.
}
- void cancel() {
+ void cancel(proton::sender& sender) {
canceled = true;
sender.connection().close();
}
- void tick() {
+ void tick(proton::sender& sender) {
if (!canceled) {
- sender.container().schedule(interval, do_tick); // Next tick
+ sender.container().schedule(interval, defer_tick); // Next tick
if (sender.credit() > 0) // Only send if we have credit
- send();
+ send(sender);
else
ready = true; // Set the ready flag, send as soon as we get credit.
}
}
- void on_sendable(proton::sender &) OVERRIDE {
+ void on_sendable(proton::sender &sender) OVERRIDE {
if (ready) // We have been ticked since the last send.
- send();
+ send(sender);
}
- void send() {
+ void send(proton::sender& sender) {
std::cout << "send" << std::endl;
sender.send(proton::message("ping"));
ready = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org