You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/07/10 00:12:18 UTC

[38/50] qpid-proton git commit: PROTON-928: cancellable tasks

PROTON-928: cancellable tasks

A scheduled task can be cancelled.
A cancelled task does not prevent reactor from stopping running


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3

Branch: refs/heads/cjansen-cpp-client
Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48
Parents: 09af375
Author: Bozo Dragojevic <bo...@digiverse.si>
Authored: Tue Jul 7 10:17:40 2015 +0200
Committer: Bozo Dragojevic <bo...@digiverse.si>
Committed: Tue Jul 7 21:49:44 2015 +0200

----------------------------------------------------------------------
 proton-c/bindings/python/proton/reactor.py      |  5 +++-
 proton-c/include/proton/reactor.h               |  1 +
 proton-c/src/reactor/timer.c                    | 25 +++++++++++++++++++-
 proton-c/src/tests/reactor.c                    | 15 ++++++++++++
 .../org/apache/qpid/proton/reactor/Task.java    |  4 ++++
 .../qpid/proton/reactor/impl/TaskImpl.java      | 10 ++++++++
 .../apache/qpid/proton/reactor/impl/Timer.java  | 19 ++++++++++++---
 proton-j/src/main/resources/creactor.py         |  3 +++
 tests/python/proton_tests/reactor.py            | 14 +++++++++++
 9 files changed, 91 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py
index c66334b..d019554 100644
--- a/proton-c/bindings/python/proton/reactor.py
+++ b/proton-c/bindings/python/proton/reactor.py
@@ -53,6 +53,9 @@ class Task(Wrapper):
     def _init(self):
         pass
 
+    def cancel(self):
+        pn_task_cancel(self._impl)
+
 class Acceptor(Wrapper):
 
     def __init__(self, impl):
@@ -112,7 +115,7 @@ class Reactor(Wrapper):
         pn_reactor_yield(self._impl)
 
     def mark(self):
-        pn_reactor_mark(self._impl)
+        return pn_reactor_mark(self._impl)
 
     def _get_handler(self):
         return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h
index 59b2282..6f52d22 100644
--- a/proton-c/include/proton/reactor.h
+++ b/proton-c/include/proton/reactor.h
@@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer, pn_timestamp_t deadlin
 PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
 
 PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
+PN_EXTERN void pn_task_cancel(pn_task_t *task);
 
 PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void *object);
 PN_EXTERN pn_reactor_t *pn_object_reactor(void *object);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c
index 1ad0821..61efd31 100644
--- a/proton-c/src/reactor/timer.c
+++ b/proton-c/src/reactor/timer.c
@@ -27,12 +27,14 @@ struct pn_task_t {
   pn_list_t *pool;
   pn_record_t *attachments;
   pn_timestamp_t deadline;
+  bool cancelled;
 };
 
 void pn_task_initialize(pn_task_t *task) {
   task->pool = NULL;
   task->attachments = pn_record();
   task->deadline = 0;
+  task->cancelled = false;
 }
 
 void pn_task_finalize(pn_task_t *task) {
@@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) {
   return task->attachments;
 }
 
+void pn_task_cancel(pn_task_t *task) {
+    assert(task);
+    task->cancelled = true;
+}
+
 //
 // timer
 //
@@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer,  pn_timestamp_t deadline) {
   return task;
 }
 
+void pni_timer_flush_cancelled(pn_timer_t *timer) {
+    while (pn_list_size(timer->tasks)) {
+        pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
+        if (task->cancelled) {
+            pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
+            assert(min == task);
+            pn_decref(min);
+        } else {
+            break;
+        }
+    }
+}
+
 pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) {
   assert(timer);
+  pni_timer_flush_cancelled(timer);
   if (pn_list_size(timer->tasks)) {
     pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
     return task->deadline;
@@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now) {
     if (now >= task->deadline) {
       pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
       assert(min == task);
-      pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
+      if (!min->cancelled)
+          pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
       pn_decref(min);
     } else {
       break;
@@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now) {
 
 int pn_timer_tasks(pn_timer_t *timer) {
   assert(timer);
+  pni_timer_flush_cancelled(timer);
   return pn_list_size(timer->tasks);
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
index fe6c769..059d099 100644
--- a/proton-c/src/tests/reactor.c
+++ b/proton-c/src/tests/reactor.c
@@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) {
   pn_free(tevents);
 }
 
+static void test_reactor_schedule_cancel(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  pn_handler_t *root = pn_reactor_get_handler(reactor);
+  pn_list_t *events = pn_list(PN_VOID, 0);
+  pn_handler_add(root, test_handler(reactor, events));
+  pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL);
+  pn_task_cancel(task);
+  pn_reactor_run(reactor);
+  pn_reactor_free(reactor);
+  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED,
+         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
+  pn_free(events);
+}
+
 int main(int argc, char **argv)
 {
   test_reactor();
@@ -461,5 +475,6 @@ int main(int argc, char **argv)
   test_reactor_transfer(4*1024, 1024);
   test_reactor_schedule();
   test_reactor_schedule_handler();
+  test_reactor_schedule_cancel();
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
index 69701ab..7fb5964 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
@@ -43,4 +43,8 @@ public interface Task extends Extendable {
     /** @return the reactor that created this task. */
     Reactor getReactor();
 
+    /**
+     * Cancel the execution of this task. No-op if invoked after the task was already executed.
+     */
+    void cancel();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
index 00c9a84..11bb6b8 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
@@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
 public class TaskImpl implements Task, Comparable<TaskImpl> {
     private final long deadline;
     private final int counter;
+    private boolean cancelled = false;
     private final AtomicInteger count = new AtomicInteger();
     private Record attachments = new RecordImpl();
     private Reactor reactor;
@@ -58,6 +59,15 @@ public class TaskImpl implements Task, Comparable<TaskImpl> {
         return deadline;
     }
 
+    public boolean isCancelled() {
+        return cancelled;
+    }
+
+    @Override
+    public void cancel() {
+        cancelled = true;
+    }
+
     public void setReactor(Reactor reactor) {
         this.reactor = reactor;
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
index 32bb4f6..b8df19d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
@@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
 public class Timer {
 
     private CollectorImpl collector;
-    private PriorityQueue<Task> tasks = new PriorityQueue<Task>();
+    private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
 
     public Timer(Collector collector) {
         this.collector = (CollectorImpl)collector;
@@ -44,6 +44,7 @@ public class Timer {
     }
 
     long deadline() {
+        flushCancelled();
         if (tasks.size() > 0) {
             Task task = tasks.peek();
             return task.deadline();
@@ -52,12 +53,23 @@ public class Timer {
         }
     }
 
+    private void flushCancelled() {
+        while (!tasks.isEmpty()) {
+            TaskImpl task = tasks.peek();
+            if (task.isCancelled())
+                tasks.poll();
+            else
+                break;
+        }
+    }
+
     void tick(long now) {
         while(!tasks.isEmpty()) {
-            Task task = tasks.peek();
+            TaskImpl task = tasks.peek();
             if (now >= task.deadline()) {
                 tasks.poll();
-                collector.put(Type.TIMER_TASK, task);
+                if (!task.isCancelled())
+                    collector.put(Type.TIMER_TASK, task);
             } else {
                 break;
             }
@@ -65,6 +77,7 @@ public class Timer {
     }
 
     int tasks() {
+        flushCancelled();
         return tasks.size();
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/creactor.py b/proton-j/src/main/resources/creactor.py
index e179b23..1f8514e 100644
--- a/proton-j/src/main/resources/creactor.py
+++ b/proton-j/src/main/resources/creactor.py
@@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd):
 def pn_acceptor_close(a):
     a.close()
 
+def pn_task_cancel(t):
+    t.cancel()
+
 def pn_object_reactor(o):
     if hasattr(o, "impl"):
         if hasattr(o.impl, "getSession"):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/reactor.py b/tests/python/proton_tests/reactor.py
index 6afee30..067c5c0 100644
--- a/tests/python/proton_tests/reactor.py
+++ b/tests/python/proton_tests/reactor.py
@@ -171,3 +171,17 @@ class ExceptionTest(Test):
             assert False, "expected to barf"
         except Barf:
             pass
+
+    def test_schedule_cancel(self):
+        barf = self.reactor.schedule(10, BarfOnTask())
+        class CancelBarf:
+            def on_timer_task(self, event):
+                barf.cancel()
+        self.reactor.schedule(0, CancelBarf())
+        now = self.reactor.mark()
+        try:
+            self.reactor.run()
+            elapsed = self.reactor.mark() - now
+            assert elapsed < 10, "expected cancelled task to not delay the reactor by " + elapsed
+        except Barf:
+            assert False, "expected barf to be cancelled"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org