You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2017/07/21 17:02:12 UTC
[16/20] qpid-proton git commit: PROTON-1482: [C++ binding]
Implemented scheduling delaying work on a work_queue
PROTON-1482: [C++ binding] Implemented scheduling delaying work on a work_queue
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/88c2d7d4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/88c2d7d4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/88c2d7d4
Branch: refs/heads/master
Commit: 88c2d7d481a6da96f85a6781ae27b488cfacd801
Parents: ca446ea
Author: Andrew Stitcher <as...@apache.org>
Authored: Fri May 19 00:59:55 2017 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Fri Jul 21 12:50:06 2017 -0400
----------------------------------------------------------------------
proton-c/bindings/cpp/include/proton/fwd.hpp | 1 +
.../bindings/cpp/include/proton/work_queue.hpp | 8 ++++++++
.../src/include/proactor_work_queue_impl.hpp | 3 +--
.../cpp/src/proactor_container_impl.cpp | 21 +++++++++++++-------
proton-c/bindings/cpp/src/work_queue.cpp | 8 ++++++++
5 files changed, 32 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/include/proton/fwd.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/fwd.hpp b/proton-c/bindings/cpp/include/proton/fwd.hpp
index b839f42..5ade5fd 100644
--- a/proton-c/bindings/cpp/include/proton/fwd.hpp
+++ b/proton-c/bindings/cpp/include/proton/fwd.hpp
@@ -29,6 +29,7 @@ class connection;
class connection_options;
class container;
class delivery;
+class duration;
class error_condition;
class event;
class message;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/include/proton/work_queue.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp b/proton-c/bindings/cpp/include/proton/work_queue.hpp
index fe739f5..bef041c 100644
--- a/proton-c/bindings/cpp/include/proton/work_queue.hpp
+++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp
@@ -94,6 +94,14 @@ class PN_CPP_CLASS_EXTERN work_queue {
/// or f() cannot be injected for any other reason.
PN_CPP_EXTERN bool add(work f);
+ /// Add work to the work queue after duration: f() will be called after the duration
+ /// serialised with other work in the queue: possibly in another thread.
+ ///
+ /// The scheduled execution is "best effort" and it is possible that after the elapsed duration
+ /// the work will not be able to be injected into the serialised context - there will be no
+ /// indication of this.
+ PN_CPP_EXTERN void schedule(duration, work);
+
private:
PN_CPP_EXTERN static work_queue& get(pn_connection_t*);
PN_CPP_EXTERN static work_queue& get(pn_session_t*);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
index ac0f803..1c94254 100644
--- a/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
@@ -23,8 +23,6 @@
*/
#include "proton/fwd.hpp"
-#include "proton/internal/config.hpp"
-#include "proton/work_queue.hpp"
namespace proton {
@@ -32,6 +30,7 @@ class work_queue::impl {
public:
virtual ~impl() {};
virtual bool add(work f) = 0;
+ virtual void schedule(duration, work) = 0;
virtual void run_all_jobs() = 0;
virtual void finished() = 0;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 0135c08..62115fd 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -45,17 +45,26 @@ namespace proton {
class container::impl::common_work_queue : public work_queue::impl {
public:
- common_work_queue(): finished_(false) {}
+ common_work_queue(container::impl& c): container_(c), finished_(false) {}
typedef std::vector<work> jobs;
void run_all_jobs();
void finished() { finished_ = true; }
+ void schedule(duration, work);
+ container::impl& container_;
jobs jobs_;
bool finished_;
};
+void container::impl::common_work_queue::schedule(duration d, work f) {
+ // Note this is an unbounded work queue.
+ // A resource-safe implementation should be bounded.
+ if (finished_) return;
+ container_.schedule(d, make_work(&work_queue::impl::add, (work_queue::impl*)this, f));
+}
+
void container::impl::common_work_queue::run_all_jobs() {
jobs j;
// Lock this operation for mt
@@ -69,7 +78,7 @@ void container::impl::common_work_queue::run_all_jobs() {
class container::impl::connection_work_queue : public common_work_queue {
public:
- connection_work_queue(pn_connection_t* c): connection_(c) {}
+ connection_work_queue(container::impl& ct, pn_connection_t* c): common_work_queue(ct), connection_(c) {}
bool add(work f);
@@ -87,12 +96,10 @@ bool container::impl::connection_work_queue::add(work f) {
class container::impl::container_work_queue : public common_work_queue {
public:
- container_work_queue(container::impl& c): container_(c) {}
+ container_work_queue(container::impl& c): common_work_queue(c) {}
~container_work_queue() { container_.remove_work_queue(this); }
bool add(work f);
-
- container::impl& container_;
};
bool container::impl::container_work_queue::add(work f) {
@@ -147,7 +154,7 @@ proton::connection container::impl::connect_common(
connection_context& cc(connection_context::get(pnc));
cc.container = &container_;
cc.handler = mh;
- cc.work_queue_ = new container::impl::connection_work_queue(pnc);
+ cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, pnc);
pn_connection_set_container(pnc, id_.c_str());
pn_connection_set_hostname(pnc, url.host().c_str());
@@ -327,7 +334,7 @@ bool container::impl::handle(pn_event_t* event) {
cc.container = &container_;
cc.listener_context_ = &lc;
cc.handler = opts.handler();
- cc.work_queue_ = new container::impl::connection_work_queue(c);
+ cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, c);
pn_listener_accept(l, c);
return false;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/src/work_queue.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/work_queue.cpp b/proton-c/bindings/cpp/src/work_queue.cpp
index 77fa3fb..f8fc45c 100644
--- a/proton-c/bindings/cpp/src/work_queue.cpp
+++ b/proton-c/bindings/cpp/src/work_queue.cpp
@@ -19,6 +19,8 @@
#include "proton/work_queue.hpp"
+#include "proton/duration.hpp"
+
#include "contexts.hpp"
#include "proactor_container_impl.hpp"
#include "proactor_work_queue_impl.hpp"
@@ -41,6 +43,12 @@ bool work_queue::add(work f) {
return impl_->add(f);
}
+void work_queue::schedule(duration d, work f) {
+ // If we have no actual work queue, then can't defer
+ if (!impl_) return;
+ return impl_->schedule(d, f);
+}
+
work_queue& work_queue::get(pn_connection_t* c) {
return connection_context::get(c).work_queue_;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org