You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/07/10 00:12:18 UTC
[38/50] qpid-proton git commit: PROTON-928: cancellable tasks
PROTON-928: cancellable tasks
A scheduled task can be cancelled.
A cancelled task does not prevent reactor from stopping running
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3
Branch: refs/heads/cjansen-cpp-client
Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48
Parents: 09af375
Author: Bozo Dragojevic <bo...@digiverse.si>
Authored: Tue Jul 7 10:17:40 2015 +0200
Committer: Bozo Dragojevic <bo...@digiverse.si>
Committed: Tue Jul 7 21:49:44 2015 +0200
----------------------------------------------------------------------
proton-c/bindings/python/proton/reactor.py | 5 +++-
proton-c/include/proton/reactor.h | 1 +
proton-c/src/reactor/timer.c | 25 +++++++++++++++++++-
proton-c/src/tests/reactor.c | 15 ++++++++++++
.../org/apache/qpid/proton/reactor/Task.java | 4 ++++
.../qpid/proton/reactor/impl/TaskImpl.java | 10 ++++++++
.../apache/qpid/proton/reactor/impl/Timer.java | 19 ++++++++++++---
proton-j/src/main/resources/creactor.py | 3 +++
tests/python/proton_tests/reactor.py | 14 +++++++++++
9 files changed, 91 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py
index c66334b..d019554 100644
--- a/proton-c/bindings/python/proton/reactor.py
+++ b/proton-c/bindings/python/proton/reactor.py
@@ -53,6 +53,9 @@ class Task(Wrapper):
def _init(self):
pass
+ def cancel(self):
+ pn_task_cancel(self._impl)
+
class Acceptor(Wrapper):
def __init__(self, impl):
@@ -112,7 +115,7 @@ class Reactor(Wrapper):
pn_reactor_yield(self._impl)
def mark(self):
- pn_reactor_mark(self._impl)
+ return pn_reactor_mark(self._impl)
def _get_handler(self):
return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h
index 59b2282..6f52d22 100644
--- a/proton-c/include/proton/reactor.h
+++ b/proton-c/include/proton/reactor.h
@@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer, pn_timestamp_t deadlin
PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
+PN_EXTERN void pn_task_cancel(pn_task_t *task);
PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void *object);
PN_EXTERN pn_reactor_t *pn_object_reactor(void *object);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c
index 1ad0821..61efd31 100644
--- a/proton-c/src/reactor/timer.c
+++ b/proton-c/src/reactor/timer.c
@@ -27,12 +27,14 @@ struct pn_task_t {
pn_list_t *pool;
pn_record_t *attachments;
pn_timestamp_t deadline;
+ bool cancelled;
};
void pn_task_initialize(pn_task_t *task) {
task->pool = NULL;
task->attachments = pn_record();
task->deadline = 0;
+ task->cancelled = false;
}
void pn_task_finalize(pn_task_t *task) {
@@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) {
return task->attachments;
}
+void pn_task_cancel(pn_task_t *task) {
+ assert(task);
+ task->cancelled = true;
+}
+
//
// timer
//
@@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer, pn_timestamp_t deadline) {
return task;
}
+void pni_timer_flush_cancelled(pn_timer_t *timer) {
+ while (pn_list_size(timer->tasks)) {
+ pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
+ if (task->cancelled) {
+ pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
+ assert(min == task);
+ pn_decref(min);
+ } else {
+ break;
+ }
+ }
+}
+
pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) {
assert(timer);
+ pni_timer_flush_cancelled(timer);
if (pn_list_size(timer->tasks)) {
pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
return task->deadline;
@@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now) {
if (now >= task->deadline) {
pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
assert(min == task);
- pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
+ if (!min->cancelled)
+ pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
pn_decref(min);
} else {
break;
@@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now) {
int pn_timer_tasks(pn_timer_t *timer) {
assert(timer);
+ pni_timer_flush_cancelled(timer);
return pn_list_size(timer->tasks);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
index fe6c769..059d099 100644
--- a/proton-c/src/tests/reactor.c
+++ b/proton-c/src/tests/reactor.c
@@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) {
pn_free(tevents);
}
+static void test_reactor_schedule_cancel(void) {
+ pn_reactor_t *reactor = pn_reactor();
+ pn_handler_t *root = pn_reactor_get_handler(reactor);
+ pn_list_t *events = pn_list(PN_VOID, 0);
+ pn_handler_add(root, test_handler(reactor, events));
+ pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL);
+ pn_task_cancel(task);
+ pn_reactor_run(reactor);
+ pn_reactor_free(reactor);
+ expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED,
+ PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
+ pn_free(events);
+}
+
int main(int argc, char **argv)
{
test_reactor();
@@ -461,5 +475,6 @@ int main(int argc, char **argv)
test_reactor_transfer(4*1024, 1024);
test_reactor_schedule();
test_reactor_schedule_handler();
+ test_reactor_schedule_cancel();
return 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
index 69701ab..7fb5964 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
@@ -43,4 +43,8 @@ public interface Task extends Extendable {
/** @return the reactor that created this task. */
Reactor getReactor();
+ /**
+ * Cancel the execution of this task. No-op if invoked after the task was already executed.
+ */
+ void cancel();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
index 00c9a84..11bb6b8 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
@@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
public class TaskImpl implements Task, Comparable<TaskImpl> {
private final long deadline;
private final int counter;
+ private boolean cancelled = false;
private final AtomicInteger count = new AtomicInteger();
private Record attachments = new RecordImpl();
private Reactor reactor;
@@ -58,6 +59,15 @@ public class TaskImpl implements Task, Comparable<TaskImpl> {
return deadline;
}
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ }
+
public void setReactor(Reactor reactor) {
this.reactor = reactor;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
index 32bb4f6..b8df19d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
@@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
public class Timer {
private CollectorImpl collector;
- private PriorityQueue<Task> tasks = new PriorityQueue<Task>();
+ private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
public Timer(Collector collector) {
this.collector = (CollectorImpl)collector;
@@ -44,6 +44,7 @@ public class Timer {
}
long deadline() {
+ flushCancelled();
if (tasks.size() > 0) {
Task task = tasks.peek();
return task.deadline();
@@ -52,12 +53,23 @@ public class Timer {
}
}
+ private void flushCancelled() {
+ while (!tasks.isEmpty()) {
+ TaskImpl task = tasks.peek();
+ if (task.isCancelled())
+ tasks.poll();
+ else
+ break;
+ }
+ }
+
void tick(long now) {
while(!tasks.isEmpty()) {
- Task task = tasks.peek();
+ TaskImpl task = tasks.peek();
if (now >= task.deadline()) {
tasks.poll();
- collector.put(Type.TIMER_TASK, task);
+ if (!task.isCancelled())
+ collector.put(Type.TIMER_TASK, task);
} else {
break;
}
@@ -65,6 +77,7 @@ public class Timer {
}
int tasks() {
+ flushCancelled();
return tasks.size();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/creactor.py b/proton-j/src/main/resources/creactor.py
index e179b23..1f8514e 100644
--- a/proton-j/src/main/resources/creactor.py
+++ b/proton-j/src/main/resources/creactor.py
@@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd):
def pn_acceptor_close(a):
a.close()
+def pn_task_cancel(t):
+ t.cancel()
+
def pn_object_reactor(o):
if hasattr(o, "impl"):
if hasattr(o.impl, "getSession"):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/reactor.py b/tests/python/proton_tests/reactor.py
index 6afee30..067c5c0 100644
--- a/tests/python/proton_tests/reactor.py
+++ b/tests/python/proton_tests/reactor.py
@@ -171,3 +171,17 @@ class ExceptionTest(Test):
assert False, "expected to barf"
except Barf:
pass
+
+ def test_schedule_cancel(self):
+ barf = self.reactor.schedule(10, BarfOnTask())
+ class CancelBarf:
+ def on_timer_task(self, event):
+ barf.cancel()
+ self.reactor.schedule(0, CancelBarf())
+ now = self.reactor.mark()
+ try:
+ self.reactor.run()
+ elapsed = self.reactor.mark() - now
+ assert elapsed < 10, "expected cancelled task to not delay the reactor by " + elapsed
+ except Barf:
+ assert False, "expected barf to be cancelled"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org