You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2017/07/21 17:02:12 UTC

[16/20] qpid-proton git commit: PROTON-1482: [C++ binding] Implemented scheduling delaying work on a work_queue

PROTON-1482: [C++ binding] Implemented scheduling delaying work on a work_queue


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/88c2d7d4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/88c2d7d4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/88c2d7d4

Branch: refs/heads/master
Commit: 88c2d7d481a6da96f85a6781ae27b488cfacd801
Parents: ca446ea
Author: Andrew Stitcher <as...@apache.org>
Authored: Fri May 19 00:59:55 2017 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Fri Jul 21 12:50:06 2017 -0400

----------------------------------------------------------------------
 proton-c/bindings/cpp/include/proton/fwd.hpp    |  1 +
 .../bindings/cpp/include/proton/work_queue.hpp  |  8 ++++++++
 .../src/include/proactor_work_queue_impl.hpp    |  3 +--
 .../cpp/src/proactor_container_impl.cpp         | 21 +++++++++++++-------
 proton-c/bindings/cpp/src/work_queue.cpp        |  8 ++++++++
 5 files changed, 32 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/include/proton/fwd.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/fwd.hpp b/proton-c/bindings/cpp/include/proton/fwd.hpp
index b839f42..5ade5fd 100644
--- a/proton-c/bindings/cpp/include/proton/fwd.hpp
+++ b/proton-c/bindings/cpp/include/proton/fwd.hpp
@@ -29,6 +29,7 @@ class connection;
 class connection_options;
 class container;
 class delivery;
+class duration;
 class error_condition;
 class event;
 class message;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/include/proton/work_queue.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp b/proton-c/bindings/cpp/include/proton/work_queue.hpp
index fe739f5..bef041c 100644
--- a/proton-c/bindings/cpp/include/proton/work_queue.hpp
+++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp
@@ -94,6 +94,14 @@ class PN_CPP_CLASS_EXTERN work_queue {
     /// or f() cannot be injected for any other reason.
     PN_CPP_EXTERN bool add(work f);
 
+    /// Add work to the work queue after duration: f() will be called after the duration
+    /// serialised with other work in the queue: possibly in another thread.
+    ///
+    /// The scheduled execution is "best effort" and it is possible that after the elapsed duration
+    /// the work will not be able to be injected into the serialised context - there will be no
+    /// indication of this.
+    PN_CPP_EXTERN void schedule(duration, work);
+
   private:
     PN_CPP_EXTERN static work_queue& get(pn_connection_t*);
     PN_CPP_EXTERN static work_queue& get(pn_session_t*);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
index ac0f803..1c94254 100644
--- a/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
@@ -23,8 +23,6 @@
  */
 
 #include "proton/fwd.hpp"
-#include "proton/internal/config.hpp"
-#include "proton/work_queue.hpp"
 
 namespace proton {
 
@@ -32,6 +30,7 @@ class work_queue::impl {
   public:
     virtual ~impl() {};
     virtual bool add(work f) = 0;
+    virtual void schedule(duration, work) = 0;
     virtual void run_all_jobs() = 0;
     virtual void finished() = 0;
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 0135c08..62115fd 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -45,17 +45,26 @@ namespace proton {
 
 class container::impl::common_work_queue : public work_queue::impl {
   public:
-    common_work_queue(): finished_(false) {}
+    common_work_queue(container::impl& c): container_(c), finished_(false) {}
 
     typedef std::vector<work> jobs;
 
     void run_all_jobs();
     void finished() { finished_ = true; }
+    void schedule(duration, work);
 
+    container::impl& container_;
     jobs jobs_;
     bool finished_;
 };
 
+void container::impl::common_work_queue::schedule(duration d, work f) {
+    // Note this is an unbounded work queue.
+    // A resource-safe implementation should be bounded.
+    if (finished_) return;
+    container_.schedule(d, make_work(&work_queue::impl::add, (work_queue::impl*)this, f));
+}
+
 void container::impl::common_work_queue::run_all_jobs() {
     jobs j;
     // Lock this operation for mt
@@ -69,7 +78,7 @@ void container::impl::common_work_queue::run_all_jobs() {
 
 class container::impl::connection_work_queue : public common_work_queue {
   public:
-    connection_work_queue(pn_connection_t* c): connection_(c) {}
+    connection_work_queue(container::impl& ct, pn_connection_t* c): common_work_queue(ct), connection_(c) {}
 
     bool add(work f);
 
@@ -87,12 +96,10 @@ bool container::impl::connection_work_queue::add(work f) {
 
 class container::impl::container_work_queue : public common_work_queue {
   public:
-    container_work_queue(container::impl& c): container_(c) {}
+    container_work_queue(container::impl& c): common_work_queue(c) {}
     ~container_work_queue() { container_.remove_work_queue(this); }
 
     bool add(work f);
-
-    container::impl& container_;
 };
 
 bool container::impl::container_work_queue::add(work f) {
@@ -147,7 +154,7 @@ proton::connection container::impl::connect_common(
     connection_context& cc(connection_context::get(pnc));
     cc.container = &container_;
     cc.handler = mh;
-    cc.work_queue_ = new container::impl::connection_work_queue(pnc);
+    cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, pnc);
 
     pn_connection_set_container(pnc, id_.c_str());
     pn_connection_set_hostname(pnc, url.host().c_str());
@@ -327,7 +334,7 @@ bool container::impl::handle(pn_event_t* event) {
         cc.container = &container_;
         cc.listener_context_ = &lc;
         cc.handler = opts.handler();
-        cc.work_queue_ = new container::impl::connection_work_queue(c);
+        cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, c);
         pn_listener_accept(l, c);
         return false;
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/src/work_queue.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/work_queue.cpp b/proton-c/bindings/cpp/src/work_queue.cpp
index 77fa3fb..f8fc45c 100644
--- a/proton-c/bindings/cpp/src/work_queue.cpp
+++ b/proton-c/bindings/cpp/src/work_queue.cpp
@@ -19,6 +19,8 @@
 
 #include "proton/work_queue.hpp"
 
+#include "proton/duration.hpp"
+
 #include "contexts.hpp"
 #include "proactor_container_impl.hpp"
 #include "proactor_work_queue_impl.hpp"
@@ -41,6 +43,12 @@ bool work_queue::add(work f) {
     return impl_->add(f);
 }
 
+void work_queue::schedule(duration d, work f) {
+    // If we have no actual work queue, then can't defer
+    if (!impl_) return;
+    return impl_->schedule(d, f);
+}
+
 work_queue& work_queue::get(pn_connection_t* c) {
     return connection_context::get(c).work_queue_;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org