You are viewing a plain text version of this content. The canonical link for it is here.
Posted to proton@qpid.apache.org by Bozo Dragojevic <bo...@digiverse.si> on 2015/07/09 11:55:45 UTC

Re: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks

Hi Ken,

I've installed python3.4 and tox and friends and tried to reproduce it here
and I can confirm that some completely unrelated test fail mysteriously
with python3.4 and that reverting my change makes that failure go away :)

I've added more task cancellation tests to force the error while still
in the implicated code, as I suspected it could be some refcounting
problem but the new tests do not show anything unusual.

What is even weirder, with the new tests even the python3.4 suite passes
without segfault!

So, I consider this a false positive and have left the change in,
including the new tests at ca47d72.

Does such solution work for you?

Bozzo

On 8. 07. 15 16.32, Ken Giusti wrote:
> Hi Bozzo,
>
> Can you please revert this change?
>
> It is causing a segfault in the python unit tests when they are run under python3.4.
>
> I haven't hit the segfault on python2.7, only on python3.4
>
> thanks,
>
> -K
>
> ----- Original Message -----
>> From: bozzo@apache.org
>> To: commits@qpid.apache.org
>> Sent: Tuesday, July 7, 2015 3:50:16 PM
>> Subject: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks
>>
>> PROTON-928: cancellable tasks
>>
>> A scheduled task can be cancelled.
>> A cancelled task does not prevent reactor from stopping running
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3
>> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3
>> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3
>>
>> Branch: refs/heads/master
>> Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48
>> Parents: 09af375
>> Author: Bozo Dragojevic <bo...@digiverse.si>
>> Authored: Tue Jul 7 10:17:40 2015 +0200
>> Committer: Bozo Dragojevic <bo...@digiverse.si>
>> Committed: Tue Jul 7 21:49:44 2015 +0200
>>
>> ----------------------------------------------------------------------
>>  proton-c/bindings/python/proton/reactor.py      |  5 +++-
>>  proton-c/include/proton/reactor.h               |  1 +
>>  proton-c/src/reactor/timer.c                    | 25 +++++++++++++++++++-
>>  proton-c/src/tests/reactor.c                    | 15 ++++++++++++
>>  .../org/apache/qpid/proton/reactor/Task.java    |  4 ++++
>>  .../qpid/proton/reactor/impl/TaskImpl.java      | 10 ++++++++
>>  .../apache/qpid/proton/reactor/impl/Timer.java  | 19 ++++++++++++---
>>  proton-j/src/main/resources/creactor.py         |  3 +++
>>  tests/python/proton_tests/reactor.py            | 14 +++++++++++
>>  9 files changed, 91 insertions(+), 5 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py
>> ----------------------------------------------------------------------
>> diff --git a/proton-c/bindings/python/proton/reactor.py
>> b/proton-c/bindings/python/proton/reactor.py
>> index c66334b..d019554 100644
>> --- a/proton-c/bindings/python/proton/reactor.py
>> +++ b/proton-c/bindings/python/proton/reactor.py
>> @@ -53,6 +53,9 @@ class Task(Wrapper):
>>      def _init(self):
>>          pass
>>  
>> +    def cancel(self):
>> +        pn_task_cancel(self._impl)
>> +
>>  class Acceptor(Wrapper):
>>  
>>      def __init__(self, impl):
>> @@ -112,7 +115,7 @@ class Reactor(Wrapper):
>>          pn_reactor_yield(self._impl)
>>  
>>      def mark(self):
>> -        pn_reactor_mark(self._impl)
>> +        return pn_reactor_mark(self._impl)
>>  
>>      def _get_handler(self):
>>          return WrappedHandler.wrap(pn_reactor_get_handler(self._impl),
>>          self.on_error)
>>
>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h
>> ----------------------------------------------------------------------
>> diff --git a/proton-c/include/proton/reactor.h
>> b/proton-c/include/proton/reactor.h
>> index 59b2282..6f52d22 100644
>> --- a/proton-c/include/proton/reactor.h
>> +++ b/proton-c/include/proton/reactor.h
>> @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>> pn_timestamp_t deadlin
>>  PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
>>  
>>  PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
>> +PN_EXTERN void pn_task_cancel(pn_task_t *task);
>>  
>>  PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void
>>  *object);
>>  PN_EXTERN pn_reactor_t *pn_object_reactor(void *object);
>>
>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c
>> ----------------------------------------------------------------------
>> diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c
>> index 1ad0821..61efd31 100644
>> --- a/proton-c/src/reactor/timer.c
>> +++ b/proton-c/src/reactor/timer.c
>> @@ -27,12 +27,14 @@ struct pn_task_t {
>>    pn_list_t *pool;
>>    pn_record_t *attachments;
>>    pn_timestamp_t deadline;
>> +  bool cancelled;
>>  };
>>  
>>  void pn_task_initialize(pn_task_t *task) {
>>    task->pool = NULL;
>>    task->attachments = pn_record();
>>    task->deadline = 0;
>> +  task->cancelled = false;
>>  }
>>  
>>  void pn_task_finalize(pn_task_t *task) {
>> @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) {
>>    return task->attachments;
>>  }
>>  
>> +void pn_task_cancel(pn_task_t *task) {
>> +    assert(task);
>> +    task->cancelled = true;
>> +}
>> +
>>  //
>>  // timer
>>  //
>> @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>> pn_timestamp_t deadline) {
>>    return task;
>>  }
>>  
>> +void pni_timer_flush_cancelled(pn_timer_t *timer) {
>> +    while (pn_list_size(timer->tasks)) {
>> +        pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>> +        if (task->cancelled) {
>> +            pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>> +            assert(min == task);
>> +            pn_decref(min);
>> +        } else {
>> +            break;
>> +        }
>> +    }
>> +}
>> +
>>  pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) {
>>    assert(timer);
>> +  pni_timer_flush_cancelled(timer);
>>    if (pn_list_size(timer->tasks)) {
>>      pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>      return task->deadline;
>> @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>> {
>>      if (now >= task->deadline) {
>>        pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>        assert(min == task);
>> -      pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>> +      if (!min->cancelled)
>> +          pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>        pn_decref(min);
>>      } else {
>>        break;
>> @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>> {
>>  
>>  int pn_timer_tasks(pn_timer_t *timer) {
>>    assert(timer);
>> +  pni_timer_flush_cancelled(timer);
>>    return pn_list_size(timer->tasks);
>>  }
>>
>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c
>> ----------------------------------------------------------------------
>> diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
>> index fe6c769..059d099 100644
>> --- a/proton-c/src/tests/reactor.c
>> +++ b/proton-c/src/tests/reactor.c
>> @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) {
>>    pn_free(tevents);
>>  }
>>  
>> +static void test_reactor_schedule_cancel(void) {
>> +  pn_reactor_t *reactor = pn_reactor();
>> +  pn_handler_t *root = pn_reactor_get_handler(reactor);
>> +  pn_list_t *events = pn_list(PN_VOID, 0);
>> +  pn_handler_add(root, test_handler(reactor, events));
>> +  pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL);
>> +  pn_task_cancel(task);
>> +  pn_reactor_run(reactor);
>> +  pn_reactor_free(reactor);
>> +  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED,
>> +         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
>> +  pn_free(events);
>> +}
>> +
>>  int main(int argc, char **argv)
>>  {
>>    test_reactor();
>> @@ -461,5 +475,6 @@ int main(int argc, char **argv)
>>    test_reactor_transfer(4*1024, 1024);
>>    test_reactor_schedule();
>>    test_reactor_schedule_handler();
>> +  test_reactor_schedule_cancel();
>>    return 0;
>>  }
>>
>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>> ----------------------------------------------------------------------
>> diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>> index 69701ab..7fb5964 100644
>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>> @@ -43,4 +43,8 @@ public interface Task extends Extendable {
>>      /** @return the reactor that created this task. */
>>      Reactor getReactor();
>>  
>> +    /**
>> +     * Cancel the execution of this task. No-op if invoked after the task
>> was already executed.
>> +     */
>> +    void cancel();
>>  }
>>
>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>> index 00c9a84..11bb6b8 100644
>> ---
>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>> +++
>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>> @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>  public class TaskImpl implements Task, Comparable<TaskImpl> {
>>      private final long deadline;
>>      private final int counter;
>> +    private boolean cancelled = false;
>>      private final AtomicInteger count = new AtomicInteger();
>>      private Record attachments = new RecordImpl();
>>      private Reactor reactor;
>> @@ -58,6 +59,15 @@ public class TaskImpl implements Task,
>> Comparable<TaskImpl> {
>>          return deadline;
>>      }
>>  
>> +    public boolean isCancelled() {
>> +        return cancelled;
>> +    }
>> +
>> +    @Override
>> +    public void cancel() {
>> +        cancelled = true;
>> +    }
>> +
>>      public void setReactor(Reactor reactor) {
>>          this.reactor = reactor;
>>      }
>>
>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>> index 32bb4f6..b8df19d 100644
>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>> @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>  public class Timer {
>>  
>>      private CollectorImpl collector;
>> -    private PriorityQueue<Task> tasks = new PriorityQueue<Task>();
>> +    private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
>>  
>>      public Timer(Collector collector) {
>>          this.collector = (CollectorImpl)collector;
>> @@ -44,6 +44,7 @@ public class Timer {
>>      }
>>  
>>      long deadline() {
>> +        flushCancelled();
>>          if (tasks.size() > 0) {
>>              Task task = tasks.peek();
>>              return task.deadline();
>> @@ -52,12 +53,23 @@ public class Timer {
>>          }
>>      }
>>  
>> +    private void flushCancelled() {
>> +        while (!tasks.isEmpty()) {
>> +            TaskImpl task = tasks.peek();
>> +            if (task.isCancelled())
>> +                tasks.poll();
>> +            else
>> +                break;
>> +        }
>> +    }
>> +
>>      void tick(long now) {
>>          while(!tasks.isEmpty()) {
>> -            Task task = tasks.peek();
>> +            TaskImpl task = tasks.peek();
>>              if (now >= task.deadline()) {
>>                  tasks.poll();
>> -                collector.put(Type.TIMER_TASK, task);
>> +                if (!task.isCancelled())
>> +                    collector.put(Type.TIMER_TASK, task);
>>              } else {
>>                  break;
>>              }
>> @@ -65,6 +77,7 @@ public class Timer {
>>      }
>>  
>>      int tasks() {
>> +        flushCancelled();
>>          return tasks.size();
>>      }
>>  }
>>
>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py
>> ----------------------------------------------------------------------
>> diff --git a/proton-j/src/main/resources/creactor.py
>> b/proton-j/src/main/resources/creactor.py
>> index e179b23..1f8514e 100644
>> --- a/proton-j/src/main/resources/creactor.py
>> +++ b/proton-j/src/main/resources/creactor.py
>> @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd):
>>  def pn_acceptor_close(a):
>>      a.close()
>>  
>> +def pn_task_cancel(t):
>> +    t.cancel()
>> +
>>  def pn_object_reactor(o):
>>      if hasattr(o, "impl"):
>>          if hasattr(o.impl, "getSession"):
>>
>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py
>> ----------------------------------------------------------------------
>> diff --git a/tests/python/proton_tests/reactor.py
>> b/tests/python/proton_tests/reactor.py
>> index 6afee30..067c5c0 100644
>> --- a/tests/python/proton_tests/reactor.py
>> +++ b/tests/python/proton_tests/reactor.py
>> @@ -171,3 +171,17 @@ class ExceptionTest(Test):
>>              assert False, "expected to barf"
>>          except Barf:
>>              pass
>> +
>> +    def test_schedule_cancel(self):
>> +        barf = self.reactor.schedule(10, BarfOnTask())
>> +        class CancelBarf:
>> +            def on_timer_task(self, event):
>> +                barf.cancel()
>> +        self.reactor.schedule(0, CancelBarf())
>> +        now = self.reactor.mark()
>> +        try:
>> +            self.reactor.run()
>> +            elapsed = self.reactor.mark() - now
>> +            assert elapsed < 10, "expected cancelled task to not delay the
>> reactor by " + elapsed
>> +        except Barf:
>> +            assert False, "expected barf to be cancelled"
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
>> For additional commands, e-mail: commits-help@qpid.apache.org
>>
>>


Re: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks

Posted by Bozo Dragojevic <bo...@digiverse.si>.
Cooking a revert for the other one, in a few.

Bozzo

On 9. 07. 15 12.59, Robbie Gemmell wrote:
> Thanks Bozzo, that sorts those failures (so long as the issue from
> your other change is worked around, to let the build get that far).
>
> Robbie
>
> On 9 July 2015 at 11:22, Bozo Dragojevic <bo...@digiverse.si> wrote:
>> Hi Robbie,
>>
>> Yeah, my bad. I was sitting on some local changes so I missed this.
>> I'll test on a clean checkout next time. Sorry for all the mess.
>> I've commited the missing methods to proton-j
>>
>> Bozzo
>>
>> On 9. 07. 15 12.14, Robbie Gemmell wrote:
>>> Hi Bozzo,
>>>
>>> This change also seems to be causing test failures when using the
>>> maven build (if you update things to get past the earlier failures,
>>> caused by the commit mentioned in the other thread on proton@):
>>>
>>> proton_tests.reactor.ExceptionTest.test_schedule_cancel ................. fail
>>> Error during test:  Traceback (most recent call last):
>>>     File "/home/gemmellr/workspace/proton/tests/python/proton-test",
>>> line 360, in run
>>>       phase()
>>>     File "/home/gemmellr/workspace/proton/tests/python/proton_tests/reactor.py",
>>> line 181, in test_schedule_cancel
>>>       now = self.reactor.mark()
>>>     File "/home/gemmellr/workspace/proton/tests/../proton-c/bindings/python/proton/reactor.py",
>>> line 118, in mark
>>>       return pn_reactor_mark(self._impl)
>>>   NameError: global name 'pn_reactor_mark' is not defined
>>>
>>>
>>> Robbie
>>>
>>> On 9 July 2015 at 10:55, Bozo Dragojevic <bo...@digiverse.si> wrote:
>>>> Hi Ken,
>>>>
>>>> I've installed python3.4 and tox and friends and tried to reproduce it here
>>>> and I can confirm that some completely unrelated test fail mysteriously
>>>> with python3.4 and that reverting my change makes that failure go away :)
>>>>
>>>> I've added more task cancellation tests to force the error while still
>>>> in the implicated code, as I suspected it could be some refcounting
>>>> problem but the new tests do not show anything unusual.
>>>>
>>>> What is even weirder, with the new tests even the python3.4 suite passes
>>>> without segfault!
>>>>
>>>> So, I consider this a false positive and have left the change in,
>>>> including the new tests at ca47d72.
>>>>
>>>> Does such solution work for you?
>>>>
>>>> Bozzo
>>>>
>>>> On 8. 07. 15 16.32, Ken Giusti wrote:
>>>>> Hi Bozzo,
>>>>>
>>>>> Can you please revert this change?
>>>>>
>>>>> It is causing a segfault in the python unit tests when they are run under python3.4.
>>>>>
>>>>> I haven't hit the segfault on python2.7, only on python3.4
>>>>>
>>>>> thanks,
>>>>>
>>>>> -K
>>>>>
>>>>> ----- Original Message -----
>>>>>> From: bozzo@apache.org
>>>>>> To: commits@qpid.apache.org
>>>>>> Sent: Tuesday, July 7, 2015 3:50:16 PM
>>>>>> Subject: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks
>>>>>>
>>>>>> PROTON-928: cancellable tasks
>>>>>>
>>>>>> A scheduled task can be cancelled.
>>>>>> A cancelled task does not prevent reactor from stopping running
>>>>>>
>>>>>>
>>>>>> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
>>>>>> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3
>>>>>> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3
>>>>>> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3
>>>>>>
>>>>>> Branch: refs/heads/master
>>>>>> Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48
>>>>>> Parents: 09af375
>>>>>> Author: Bozo Dragojevic <bo...@digiverse.si>
>>>>>> Authored: Tue Jul 7 10:17:40 2015 +0200
>>>>>> Committer: Bozo Dragojevic <bo...@digiverse.si>
>>>>>> Committed: Tue Jul 7 21:49:44 2015 +0200
>>>>>>
>>>>>> ----------------------------------------------------------------------
>>>>>>  proton-c/bindings/python/proton/reactor.py      |  5 +++-
>>>>>>  proton-c/include/proton/reactor.h               |  1 +
>>>>>>  proton-c/src/reactor/timer.c                    | 25 +++++++++++++++++++-
>>>>>>  proton-c/src/tests/reactor.c                    | 15 ++++++++++++
>>>>>>  .../org/apache/qpid/proton/reactor/Task.java    |  4 ++++
>>>>>>  .../qpid/proton/reactor/impl/TaskImpl.java      | 10 ++++++++
>>>>>>  .../apache/qpid/proton/reactor/impl/Timer.java  | 19 ++++++++++++---
>>>>>>  proton-j/src/main/resources/creactor.py         |  3 +++
>>>>>>  tests/python/proton_tests/reactor.py            | 14 +++++++++++
>>>>>>  9 files changed, 91 insertions(+), 5 deletions(-)
>>>>>> ----------------------------------------------------------------------
>>>>>>
>>>>>>
>>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py
>>>>>> ----------------------------------------------------------------------
>>>>>> diff --git a/proton-c/bindings/python/proton/reactor.py
>>>>>> b/proton-c/bindings/python/proton/reactor.py
>>>>>> index c66334b..d019554 100644
>>>>>> --- a/proton-c/bindings/python/proton/reactor.py
>>>>>> +++ b/proton-c/bindings/python/proton/reactor.py
>>>>>> @@ -53,6 +53,9 @@ class Task(Wrapper):
>>>>>>      def _init(self):
>>>>>>          pass
>>>>>>
>>>>>> +    def cancel(self):
>>>>>> +        pn_task_cancel(self._impl)
>>>>>> +
>>>>>>  class Acceptor(Wrapper):
>>>>>>
>>>>>>      def __init__(self, impl):
>>>>>> @@ -112,7 +115,7 @@ class Reactor(Wrapper):
>>>>>>          pn_reactor_yield(self._impl)
>>>>>>
>>>>>>      def mark(self):
>>>>>> -        pn_reactor_mark(self._impl)
>>>>>> +        return pn_reactor_mark(self._impl)
>>>>>>
>>>>>>      def _get_handler(self):
>>>>>>          return WrappedHandler.wrap(pn_reactor_get_handler(self._impl),
>>>>>>          self.on_error)
>>>>>>
>>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h
>>>>>> ----------------------------------------------------------------------
>>>>>> diff --git a/proton-c/include/proton/reactor.h
>>>>>> b/proton-c/include/proton/reactor.h
>>>>>> index 59b2282..6f52d22 100644
>>>>>> --- a/proton-c/include/proton/reactor.h
>>>>>> +++ b/proton-c/include/proton/reactor.h
>>>>>> @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>>>>>> pn_timestamp_t deadlin
>>>>>>  PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
>>>>>>
>>>>>>  PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
>>>>>> +PN_EXTERN void pn_task_cancel(pn_task_t *task);
>>>>>>
>>>>>>  PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void
>>>>>>  *object);
>>>>>>  PN_EXTERN pn_reactor_t *pn_object_reactor(void *object);
>>>>>>
>>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c
>>>>>> ----------------------------------------------------------------------
>>>>>> diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c
>>>>>> index 1ad0821..61efd31 100644
>>>>>> --- a/proton-c/src/reactor/timer.c
>>>>>> +++ b/proton-c/src/reactor/timer.c
>>>>>> @@ -27,12 +27,14 @@ struct pn_task_t {
>>>>>>    pn_list_t *pool;
>>>>>>    pn_record_t *attachments;
>>>>>>    pn_timestamp_t deadline;
>>>>>> +  bool cancelled;
>>>>>>  };
>>>>>>
>>>>>>  void pn_task_initialize(pn_task_t *task) {
>>>>>>    task->pool = NULL;
>>>>>>    task->attachments = pn_record();
>>>>>>    task->deadline = 0;
>>>>>> +  task->cancelled = false;
>>>>>>  }
>>>>>>
>>>>>>  void pn_task_finalize(pn_task_t *task) {
>>>>>> @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) {
>>>>>>    return task->attachments;
>>>>>>  }
>>>>>>
>>>>>> +void pn_task_cancel(pn_task_t *task) {
>>>>>> +    assert(task);
>>>>>> +    task->cancelled = true;
>>>>>> +}
>>>>>> +
>>>>>>  //
>>>>>>  // timer
>>>>>>  //
>>>>>> @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>>>>>> pn_timestamp_t deadline) {
>>>>>>    return task;
>>>>>>  }
>>>>>>
>>>>>> +void pni_timer_flush_cancelled(pn_timer_t *timer) {
>>>>>> +    while (pn_list_size(timer->tasks)) {
>>>>>> +        pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>>>>> +        if (task->cancelled) {
>>>>>> +            pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>>>>> +            assert(min == task);
>>>>>> +            pn_decref(min);
>>>>>> +        } else {
>>>>>> +            break;
>>>>>> +        }
>>>>>> +    }
>>>>>> +}
>>>>>> +
>>>>>>  pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) {
>>>>>>    assert(timer);
>>>>>> +  pni_timer_flush_cancelled(timer);
>>>>>>    if (pn_list_size(timer->tasks)) {
>>>>>>      pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>>>>>      return task->deadline;
>>>>>> @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>>>>>> {
>>>>>>      if (now >= task->deadline) {
>>>>>>        pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>>>>>        assert(min == task);
>>>>>> -      pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>>>>> +      if (!min->cancelled)
>>>>>> +          pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>>>>>        pn_decref(min);
>>>>>>      } else {
>>>>>>        break;
>>>>>> @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>>>>>> {
>>>>>>
>>>>>>  int pn_timer_tasks(pn_timer_t *timer) {
>>>>>>    assert(timer);
>>>>>> +  pni_timer_flush_cancelled(timer);
>>>>>>    return pn_list_size(timer->tasks);
>>>>>>  }
>>>>>>
>>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c
>>>>>> ----------------------------------------------------------------------
>>>>>> diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
>>>>>> index fe6c769..059d099 100644
>>>>>> --- a/proton-c/src/tests/reactor.c
>>>>>> +++ b/proton-c/src/tests/reactor.c
>>>>>> @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) {
>>>>>>    pn_free(tevents);
>>>>>>  }
>>>>>>
>>>>>> +static void test_reactor_schedule_cancel(void) {
>>>>>> +  pn_reactor_t *reactor = pn_reactor();
>>>>>> +  pn_handler_t *root = pn_reactor_get_handler(reactor);
>>>>>> +  pn_list_t *events = pn_list(PN_VOID, 0);
>>>>>> +  pn_handler_add(root, test_handler(reactor, events));
>>>>>> +  pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL);
>>>>>> +  pn_task_cancel(task);
>>>>>> +  pn_reactor_run(reactor);
>>>>>> +  pn_reactor_free(reactor);
>>>>>> +  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED,
>>>>>> +         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
>>>>>> +  pn_free(events);
>>>>>> +}
>>>>>> +
>>>>>>  int main(int argc, char **argv)
>>>>>>  {
>>>>>>    test_reactor();
>>>>>> @@ -461,5 +475,6 @@ int main(int argc, char **argv)
>>>>>>    test_reactor_transfer(4*1024, 1024);
>>>>>>    test_reactor_schedule();
>>>>>>    test_reactor_schedule_handler();
>>>>>> +  test_reactor_schedule_cancel();
>>>>>>    return 0;
>>>>>>  }
>>>>>>
>>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>>>> ----------------------------------------------------------------------
>>>>>> diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>>>> index 69701ab..7fb5964 100644
>>>>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>>>> @@ -43,4 +43,8 @@ public interface Task extends Extendable {
>>>>>>      /** @return the reactor that created this task. */
>>>>>>      Reactor getReactor();
>>>>>>
>>>>>> +    /**
>>>>>> +     * Cancel the execution of this task. No-op if invoked after the task
>>>>>> was already executed.
>>>>>> +     */
>>>>>> +    void cancel();
>>>>>>  }
>>>>>>
>>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>>>> ----------------------------------------------------------------------
>>>>>> diff --git
>>>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>>>> index 00c9a84..11bb6b8 100644
>>>>>> ---
>>>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>>>> +++
>>>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>>>> @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>>>>>  public class TaskImpl implements Task, Comparable<TaskImpl> {
>>>>>>      private final long deadline;
>>>>>>      private final int counter;
>>>>>> +    private boolean cancelled = false;
>>>>>>      private final AtomicInteger count = new AtomicInteger();
>>>>>>      private Record attachments = new RecordImpl();
>>>>>>      private Reactor reactor;
>>>>>> @@ -58,6 +59,15 @@ public class TaskImpl implements Task,
>>>>>> Comparable<TaskImpl> {
>>>>>>          return deadline;
>>>>>>      }
>>>>>>
>>>>>> +    public boolean isCancelled() {
>>>>>> +        return cancelled;
>>>>>> +    }
>>>>>> +
>>>>>> +    @Override
>>>>>> +    public void cancel() {
>>>>>> +        cancelled = true;
>>>>>> +    }
>>>>>> +
>>>>>>      public void setReactor(Reactor reactor) {
>>>>>>          this.reactor = reactor;
>>>>>>      }
>>>>>>
>>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>>>> ----------------------------------------------------------------------
>>>>>> diff --git
>>>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>>>> index 32bb4f6..b8df19d 100644
>>>>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>>>> @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>>>>>  public class Timer {
>>>>>>
>>>>>>      private CollectorImpl collector;
>>>>>> -    private PriorityQueue<Task> tasks = new PriorityQueue<Task>();
>>>>>> +    private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
>>>>>>
>>>>>>      public Timer(Collector collector) {
>>>>>>          this.collector = (CollectorImpl)collector;
>>>>>> @@ -44,6 +44,7 @@ public class Timer {
>>>>>>      }
>>>>>>
>>>>>>      long deadline() {
>>>>>> +        flushCancelled();
>>>>>>          if (tasks.size() > 0) {
>>>>>>              Task task = tasks.peek();
>>>>>>              return task.deadline();
>>>>>> @@ -52,12 +53,23 @@ public class Timer {
>>>>>>          }
>>>>>>      }
>>>>>>
>>>>>> +    private void flushCancelled() {
>>>>>> +        while (!tasks.isEmpty()) {
>>>>>> +            TaskImpl task = tasks.peek();
>>>>>> +            if (task.isCancelled())
>>>>>> +                tasks.poll();
>>>>>> +            else
>>>>>> +                break;
>>>>>> +        }
>>>>>> +    }
>>>>>> +
>>>>>>      void tick(long now) {
>>>>>>          while(!tasks.isEmpty()) {
>>>>>> -            Task task = tasks.peek();
>>>>>> +            TaskImpl task = tasks.peek();
>>>>>>              if (now >= task.deadline()) {
>>>>>>                  tasks.poll();
>>>>>> -                collector.put(Type.TIMER_TASK, task);
>>>>>> +                if (!task.isCancelled())
>>>>>> +                    collector.put(Type.TIMER_TASK, task);
>>>>>>              } else {
>>>>>>                  break;
>>>>>>              }
>>>>>> @@ -65,6 +77,7 @@ public class Timer {
>>>>>>      }
>>>>>>
>>>>>>      int tasks() {
>>>>>> +        flushCancelled();
>>>>>>          return tasks.size();
>>>>>>      }
>>>>>>  }
>>>>>>
>>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py
>>>>>> ----------------------------------------------------------------------
>>>>>> diff --git a/proton-j/src/main/resources/creactor.py
>>>>>> b/proton-j/src/main/resources/creactor.py
>>>>>> index e179b23..1f8514e 100644
>>>>>> --- a/proton-j/src/main/resources/creactor.py
>>>>>> +++ b/proton-j/src/main/resources/creactor.py
>>>>>> @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd):
>>>>>>  def pn_acceptor_close(a):
>>>>>>      a.close()
>>>>>>
>>>>>> +def pn_task_cancel(t):
>>>>>> +    t.cancel()
>>>>>> +
>>>>>>  def pn_object_reactor(o):
>>>>>>      if hasattr(o, "impl"):
>>>>>>          if hasattr(o.impl, "getSession"):
>>>>>>
>>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py
>>>>>> ----------------------------------------------------------------------
>>>>>> diff --git a/tests/python/proton_tests/reactor.py
>>>>>> b/tests/python/proton_tests/reactor.py
>>>>>> index 6afee30..067c5c0 100644
>>>>>> --- a/tests/python/proton_tests/reactor.py
>>>>>> +++ b/tests/python/proton_tests/reactor.py
>>>>>> @@ -171,3 +171,17 @@ class ExceptionTest(Test):
>>>>>>              assert False, "expected to barf"
>>>>>>          except Barf:
>>>>>>              pass
>>>>>> +
>>>>>> +    def test_schedule_cancel(self):
>>>>>> +        barf = self.reactor.schedule(10, BarfOnTask())
>>>>>> +        class CancelBarf:
>>>>>> +            def on_timer_task(self, event):
>>>>>> +                barf.cancel()
>>>>>> +        self.reactor.schedule(0, CancelBarf())
>>>>>> +        now = self.reactor.mark()
>>>>>> +        try:
>>>>>> +            self.reactor.run()
>>>>>> +            elapsed = self.reactor.mark() - now
>>>>>> +            assert elapsed < 10, "expected cancelled task to not delay the
>>>>>> reactor by " + elapsed
>>>>>> +        except Barf:
>>>>>> +            assert False, "expected barf to be cancelled"
>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
>>>>>> For additional commands, e-mail: commits-help@qpid.apache.org
>>>>>>
>>>>>>


Re: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks

Posted by Robbie Gemmell <ro...@gmail.com>.
Thanks Bozzo, that sorts those failures (so long as the issue from
your other change is worked around, to let the build get that far).

Robbie

On 9 July 2015 at 11:22, Bozo Dragojevic <bo...@digiverse.si> wrote:
> Hi Robbie,
>
> Yeah, my bad. I was sitting on some local changes so I missed this.
> I'll test on a clean checkout next time. Sorry for all the mess.
> I've commited the missing methods to proton-j
>
> Bozzo
>
> On 9. 07. 15 12.14, Robbie Gemmell wrote:
>> Hi Bozzo,
>>
>> This change also seems to be causing test failures when using the
>> maven build (if you update things to get past the earlier failures,
>> caused by the commit mentioned in the other thread on proton@):
>>
>> proton_tests.reactor.ExceptionTest.test_schedule_cancel ................. fail
>> Error during test:  Traceback (most recent call last):
>>     File "/home/gemmellr/workspace/proton/tests/python/proton-test",
>> line 360, in run
>>       phase()
>>     File "/home/gemmellr/workspace/proton/tests/python/proton_tests/reactor.py",
>> line 181, in test_schedule_cancel
>>       now = self.reactor.mark()
>>     File "/home/gemmellr/workspace/proton/tests/../proton-c/bindings/python/proton/reactor.py",
>> line 118, in mark
>>       return pn_reactor_mark(self._impl)
>>   NameError: global name 'pn_reactor_mark' is not defined
>>
>>
>> Robbie
>>
>> On 9 July 2015 at 10:55, Bozo Dragojevic <bo...@digiverse.si> wrote:
>>> Hi Ken,
>>>
>>> I've installed python3.4 and tox and friends and tried to reproduce it here
>>> and I can confirm that some completely unrelated test fail mysteriously
>>> with python3.4 and that reverting my change makes that failure go away :)
>>>
>>> I've added more task cancellation tests to force the error while still
>>> in the implicated code, as I suspected it could be some refcounting
>>> problem but the new tests do not show anything unusual.
>>>
>>> What is even weirder, with the new tests even the python3.4 suite passes
>>> without segfault!
>>>
>>> So, I consider this a false positive and have left the change in,
>>> including the new tests at ca47d72.
>>>
>>> Does such solution work for you?
>>>
>>> Bozzo
>>>
>>> On 8. 07. 15 16.32, Ken Giusti wrote:
>>>> Hi Bozzo,
>>>>
>>>> Can you please revert this change?
>>>>
>>>> It is causing a segfault in the python unit tests when they are run under python3.4.
>>>>
>>>> I haven't hit the segfault on python2.7, only on python3.4
>>>>
>>>> thanks,
>>>>
>>>> -K
>>>>
>>>> ----- Original Message -----
>>>>> From: bozzo@apache.org
>>>>> To: commits@qpid.apache.org
>>>>> Sent: Tuesday, July 7, 2015 3:50:16 PM
>>>>> Subject: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks
>>>>>
>>>>> PROTON-928: cancellable tasks
>>>>>
>>>>> A scheduled task can be cancelled.
>>>>> A cancelled task does not prevent reactor from stopping running
>>>>>
>>>>>
>>>>> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
>>>>> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3
>>>>> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3
>>>>> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3
>>>>>
>>>>> Branch: refs/heads/master
>>>>> Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48
>>>>> Parents: 09af375
>>>>> Author: Bozo Dragojevic <bo...@digiverse.si>
>>>>> Authored: Tue Jul 7 10:17:40 2015 +0200
>>>>> Committer: Bozo Dragojevic <bo...@digiverse.si>
>>>>> Committed: Tue Jul 7 21:49:44 2015 +0200
>>>>>
>>>>> ----------------------------------------------------------------------
>>>>>  proton-c/bindings/python/proton/reactor.py      |  5 +++-
>>>>>  proton-c/include/proton/reactor.h               |  1 +
>>>>>  proton-c/src/reactor/timer.c                    | 25 +++++++++++++++++++-
>>>>>  proton-c/src/tests/reactor.c                    | 15 ++++++++++++
>>>>>  .../org/apache/qpid/proton/reactor/Task.java    |  4 ++++
>>>>>  .../qpid/proton/reactor/impl/TaskImpl.java      | 10 ++++++++
>>>>>  .../apache/qpid/proton/reactor/impl/Timer.java  | 19 ++++++++++++---
>>>>>  proton-j/src/main/resources/creactor.py         |  3 +++
>>>>>  tests/python/proton_tests/reactor.py            | 14 +++++++++++
>>>>>  9 files changed, 91 insertions(+), 5 deletions(-)
>>>>> ----------------------------------------------------------------------
>>>>>
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py
>>>>> ----------------------------------------------------------------------
>>>>> diff --git a/proton-c/bindings/python/proton/reactor.py
>>>>> b/proton-c/bindings/python/proton/reactor.py
>>>>> index c66334b..d019554 100644
>>>>> --- a/proton-c/bindings/python/proton/reactor.py
>>>>> +++ b/proton-c/bindings/python/proton/reactor.py
>>>>> @@ -53,6 +53,9 @@ class Task(Wrapper):
>>>>>      def _init(self):
>>>>>          pass
>>>>>
>>>>> +    def cancel(self):
>>>>> +        pn_task_cancel(self._impl)
>>>>> +
>>>>>  class Acceptor(Wrapper):
>>>>>
>>>>>      def __init__(self, impl):
>>>>> @@ -112,7 +115,7 @@ class Reactor(Wrapper):
>>>>>          pn_reactor_yield(self._impl)
>>>>>
>>>>>      def mark(self):
>>>>> -        pn_reactor_mark(self._impl)
>>>>> +        return pn_reactor_mark(self._impl)
>>>>>
>>>>>      def _get_handler(self):
>>>>>          return WrappedHandler.wrap(pn_reactor_get_handler(self._impl),
>>>>>          self.on_error)
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h
>>>>> ----------------------------------------------------------------------
>>>>> diff --git a/proton-c/include/proton/reactor.h
>>>>> b/proton-c/include/proton/reactor.h
>>>>> index 59b2282..6f52d22 100644
>>>>> --- a/proton-c/include/proton/reactor.h
>>>>> +++ b/proton-c/include/proton/reactor.h
>>>>> @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>>>>> pn_timestamp_t deadlin
>>>>>  PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
>>>>>
>>>>>  PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
>>>>> +PN_EXTERN void pn_task_cancel(pn_task_t *task);
>>>>>
>>>>>  PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void
>>>>>  *object);
>>>>>  PN_EXTERN pn_reactor_t *pn_object_reactor(void *object);
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c
>>>>> ----------------------------------------------------------------------
>>>>> diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c
>>>>> index 1ad0821..61efd31 100644
>>>>> --- a/proton-c/src/reactor/timer.c
>>>>> +++ b/proton-c/src/reactor/timer.c
>>>>> @@ -27,12 +27,14 @@ struct pn_task_t {
>>>>>    pn_list_t *pool;
>>>>>    pn_record_t *attachments;
>>>>>    pn_timestamp_t deadline;
>>>>> +  bool cancelled;
>>>>>  };
>>>>>
>>>>>  void pn_task_initialize(pn_task_t *task) {
>>>>>    task->pool = NULL;
>>>>>    task->attachments = pn_record();
>>>>>    task->deadline = 0;
>>>>> +  task->cancelled = false;
>>>>>  }
>>>>>
>>>>>  void pn_task_finalize(pn_task_t *task) {
>>>>> @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) {
>>>>>    return task->attachments;
>>>>>  }
>>>>>
>>>>> +void pn_task_cancel(pn_task_t *task) {
>>>>> +    assert(task);
>>>>> +    task->cancelled = true;
>>>>> +}
>>>>> +
>>>>>  //
>>>>>  // timer
>>>>>  //
>>>>> @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>>>>> pn_timestamp_t deadline) {
>>>>>    return task;
>>>>>  }
>>>>>
>>>>> +void pni_timer_flush_cancelled(pn_timer_t *timer) {
>>>>> +    while (pn_list_size(timer->tasks)) {
>>>>> +        pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>>>> +        if (task->cancelled) {
>>>>> +            pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>>>> +            assert(min == task);
>>>>> +            pn_decref(min);
>>>>> +        } else {
>>>>> +            break;
>>>>> +        }
>>>>> +    }
>>>>> +}
>>>>> +
>>>>>  pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) {
>>>>>    assert(timer);
>>>>> +  pni_timer_flush_cancelled(timer);
>>>>>    if (pn_list_size(timer->tasks)) {
>>>>>      pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>>>>      return task->deadline;
>>>>> @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>>>>> {
>>>>>      if (now >= task->deadline) {
>>>>>        pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>>>>        assert(min == task);
>>>>> -      pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>>>> +      if (!min->cancelled)
>>>>> +          pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>>>>        pn_decref(min);
>>>>>      } else {
>>>>>        break;
>>>>> @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>>>>> {
>>>>>
>>>>>  int pn_timer_tasks(pn_timer_t *timer) {
>>>>>    assert(timer);
>>>>> +  pni_timer_flush_cancelled(timer);
>>>>>    return pn_list_size(timer->tasks);
>>>>>  }
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c
>>>>> ----------------------------------------------------------------------
>>>>> diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
>>>>> index fe6c769..059d099 100644
>>>>> --- a/proton-c/src/tests/reactor.c
>>>>> +++ b/proton-c/src/tests/reactor.c
>>>>> @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) {
>>>>>    pn_free(tevents);
>>>>>  }
>>>>>
>>>>> +static void test_reactor_schedule_cancel(void) {
>>>>> +  pn_reactor_t *reactor = pn_reactor();
>>>>> +  pn_handler_t *root = pn_reactor_get_handler(reactor);
>>>>> +  pn_list_t *events = pn_list(PN_VOID, 0);
>>>>> +  pn_handler_add(root, test_handler(reactor, events));
>>>>> +  pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL);
>>>>> +  pn_task_cancel(task);
>>>>> +  pn_reactor_run(reactor);
>>>>> +  pn_reactor_free(reactor);
>>>>> +  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED,
>>>>> +         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
>>>>> +  pn_free(events);
>>>>> +}
>>>>> +
>>>>>  int main(int argc, char **argv)
>>>>>  {
>>>>>    test_reactor();
>>>>> @@ -461,5 +475,6 @@ int main(int argc, char **argv)
>>>>>    test_reactor_transfer(4*1024, 1024);
>>>>>    test_reactor_schedule();
>>>>>    test_reactor_schedule_handler();
>>>>> +  test_reactor_schedule_cancel();
>>>>>    return 0;
>>>>>  }
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>>> index 69701ab..7fb5964 100644
>>>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>>> @@ -43,4 +43,8 @@ public interface Task extends Extendable {
>>>>>      /** @return the reactor that created this task. */
>>>>>      Reactor getReactor();
>>>>>
>>>>> +    /**
>>>>> +     * Cancel the execution of this task. No-op if invoked after the task
>>>>> was already executed.
>>>>> +     */
>>>>> +    void cancel();
>>>>>  }
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git
>>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>>> index 00c9a84..11bb6b8 100644
>>>>> ---
>>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>>> +++
>>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>>> @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>>>>  public class TaskImpl implements Task, Comparable<TaskImpl> {
>>>>>      private final long deadline;
>>>>>      private final int counter;
>>>>> +    private boolean cancelled = false;
>>>>>      private final AtomicInteger count = new AtomicInteger();
>>>>>      private Record attachments = new RecordImpl();
>>>>>      private Reactor reactor;
>>>>> @@ -58,6 +59,15 @@ public class TaskImpl implements Task,
>>>>> Comparable<TaskImpl> {
>>>>>          return deadline;
>>>>>      }
>>>>>
>>>>> +    public boolean isCancelled() {
>>>>> +        return cancelled;
>>>>> +    }
>>>>> +
>>>>> +    @Override
>>>>> +    public void cancel() {
>>>>> +        cancelled = true;
>>>>> +    }
>>>>> +
>>>>>      public void setReactor(Reactor reactor) {
>>>>>          this.reactor = reactor;
>>>>>      }
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git
>>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>>> index 32bb4f6..b8df19d 100644
>>>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>>> @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>>>>  public class Timer {
>>>>>
>>>>>      private CollectorImpl collector;
>>>>> -    private PriorityQueue<Task> tasks = new PriorityQueue<Task>();
>>>>> +    private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
>>>>>
>>>>>      public Timer(Collector collector) {
>>>>>          this.collector = (CollectorImpl)collector;
>>>>> @@ -44,6 +44,7 @@ public class Timer {
>>>>>      }
>>>>>
>>>>>      long deadline() {
>>>>> +        flushCancelled();
>>>>>          if (tasks.size() > 0) {
>>>>>              Task task = tasks.peek();
>>>>>              return task.deadline();
>>>>> @@ -52,12 +53,23 @@ public class Timer {
>>>>>          }
>>>>>      }
>>>>>
>>>>> +    private void flushCancelled() {
>>>>> +        while (!tasks.isEmpty()) {
>>>>> +            TaskImpl task = tasks.peek();
>>>>> +            if (task.isCancelled())
>>>>> +                tasks.poll();
>>>>> +            else
>>>>> +                break;
>>>>> +        }
>>>>> +    }
>>>>> +
>>>>>      void tick(long now) {
>>>>>          while(!tasks.isEmpty()) {
>>>>> -            Task task = tasks.peek();
>>>>> +            TaskImpl task = tasks.peek();
>>>>>              if (now >= task.deadline()) {
>>>>>                  tasks.poll();
>>>>> -                collector.put(Type.TIMER_TASK, task);
>>>>> +                if (!task.isCancelled())
>>>>> +                    collector.put(Type.TIMER_TASK, task);
>>>>>              } else {
>>>>>                  break;
>>>>>              }
>>>>> @@ -65,6 +77,7 @@ public class Timer {
>>>>>      }
>>>>>
>>>>>      int tasks() {
>>>>> +        flushCancelled();
>>>>>          return tasks.size();
>>>>>      }
>>>>>  }
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py
>>>>> ----------------------------------------------------------------------
>>>>> diff --git a/proton-j/src/main/resources/creactor.py
>>>>> b/proton-j/src/main/resources/creactor.py
>>>>> index e179b23..1f8514e 100644
>>>>> --- a/proton-j/src/main/resources/creactor.py
>>>>> +++ b/proton-j/src/main/resources/creactor.py
>>>>> @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd):
>>>>>  def pn_acceptor_close(a):
>>>>>      a.close()
>>>>>
>>>>> +def pn_task_cancel(t):
>>>>> +    t.cancel()
>>>>> +
>>>>>  def pn_object_reactor(o):
>>>>>      if hasattr(o, "impl"):
>>>>>          if hasattr(o.impl, "getSession"):
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py
>>>>> ----------------------------------------------------------------------
>>>>> diff --git a/tests/python/proton_tests/reactor.py
>>>>> b/tests/python/proton_tests/reactor.py
>>>>> index 6afee30..067c5c0 100644
>>>>> --- a/tests/python/proton_tests/reactor.py
>>>>> +++ b/tests/python/proton_tests/reactor.py
>>>>> @@ -171,3 +171,17 @@ class ExceptionTest(Test):
>>>>>              assert False, "expected to barf"
>>>>>          except Barf:
>>>>>              pass
>>>>> +
>>>>> +    def test_schedule_cancel(self):
>>>>> +        barf = self.reactor.schedule(10, BarfOnTask())
>>>>> +        class CancelBarf:
>>>>> +            def on_timer_task(self, event):
>>>>> +                barf.cancel()
>>>>> +        self.reactor.schedule(0, CancelBarf())
>>>>> +        now = self.reactor.mark()
>>>>> +        try:
>>>>> +            self.reactor.run()
>>>>> +            elapsed = self.reactor.mark() - now
>>>>> +            assert elapsed < 10, "expected cancelled task to not delay the
>>>>> reactor by " + elapsed
>>>>> +        except Barf:
>>>>> +            assert False, "expected barf to be cancelled"
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
>>>>> For additional commands, e-mail: commits-help@qpid.apache.org
>>>>>
>>>>>
>

Re: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks

Posted by Bozo Dragojevic <bo...@digiverse.si>.
Hi Robbie,

Yeah, my bad. I was sitting on some local changes so I missed this.
I'll test on a clean checkout next time. Sorry for all the mess.
I've commited the missing methods to proton-j

Bozzo

On 9. 07. 15 12.14, Robbie Gemmell wrote:
> Hi Bozzo,
>
> This change also seems to be causing test failures when using the
> maven build (if you update things to get past the earlier failures,
> caused by the commit mentioned in the other thread on proton@):
>
> proton_tests.reactor.ExceptionTest.test_schedule_cancel ................. fail
> Error during test:  Traceback (most recent call last):
>     File "/home/gemmellr/workspace/proton/tests/python/proton-test",
> line 360, in run
>       phase()
>     File "/home/gemmellr/workspace/proton/tests/python/proton_tests/reactor.py",
> line 181, in test_schedule_cancel
>       now = self.reactor.mark()
>     File "/home/gemmellr/workspace/proton/tests/../proton-c/bindings/python/proton/reactor.py",
> line 118, in mark
>       return pn_reactor_mark(self._impl)
>   NameError: global name 'pn_reactor_mark' is not defined
>
>
> Robbie
>
> On 9 July 2015 at 10:55, Bozo Dragojevic <bo...@digiverse.si> wrote:
>> Hi Ken,
>>
>> I've installed python3.4 and tox and friends and tried to reproduce it here
>> and I can confirm that some completely unrelated test fail mysteriously
>> with python3.4 and that reverting my change makes that failure go away :)
>>
>> I've added more task cancellation tests to force the error while still
>> in the implicated code, as I suspected it could be some refcounting
>> problem but the new tests do not show anything unusual.
>>
>> What is even weirder, with the new tests even the python3.4 suite passes
>> without segfault!
>>
>> So, I consider this a false positive and have left the change in,
>> including the new tests at ca47d72.
>>
>> Does such solution work for you?
>>
>> Bozzo
>>
>> On 8. 07. 15 16.32, Ken Giusti wrote:
>>> Hi Bozzo,
>>>
>>> Can you please revert this change?
>>>
>>> It is causing a segfault in the python unit tests when they are run under python3.4.
>>>
>>> I haven't hit the segfault on python2.7, only on python3.4
>>>
>>> thanks,
>>>
>>> -K
>>>
>>> ----- Original Message -----
>>>> From: bozzo@apache.org
>>>> To: commits@qpid.apache.org
>>>> Sent: Tuesday, July 7, 2015 3:50:16 PM
>>>> Subject: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks
>>>>
>>>> PROTON-928: cancellable tasks
>>>>
>>>> A scheduled task can be cancelled.
>>>> A cancelled task does not prevent reactor from stopping running
>>>>
>>>>
>>>> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
>>>> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3
>>>> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3
>>>> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3
>>>>
>>>> Branch: refs/heads/master
>>>> Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48
>>>> Parents: 09af375
>>>> Author: Bozo Dragojevic <bo...@digiverse.si>
>>>> Authored: Tue Jul 7 10:17:40 2015 +0200
>>>> Committer: Bozo Dragojevic <bo...@digiverse.si>
>>>> Committed: Tue Jul 7 21:49:44 2015 +0200
>>>>
>>>> ----------------------------------------------------------------------
>>>>  proton-c/bindings/python/proton/reactor.py      |  5 +++-
>>>>  proton-c/include/proton/reactor.h               |  1 +
>>>>  proton-c/src/reactor/timer.c                    | 25 +++++++++++++++++++-
>>>>  proton-c/src/tests/reactor.c                    | 15 ++++++++++++
>>>>  .../org/apache/qpid/proton/reactor/Task.java    |  4 ++++
>>>>  .../qpid/proton/reactor/impl/TaskImpl.java      | 10 ++++++++
>>>>  .../apache/qpid/proton/reactor/impl/Timer.java  | 19 ++++++++++++---
>>>>  proton-j/src/main/resources/creactor.py         |  3 +++
>>>>  tests/python/proton_tests/reactor.py            | 14 +++++++++++
>>>>  9 files changed, 91 insertions(+), 5 deletions(-)
>>>> ----------------------------------------------------------------------
>>>>
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py
>>>> ----------------------------------------------------------------------
>>>> diff --git a/proton-c/bindings/python/proton/reactor.py
>>>> b/proton-c/bindings/python/proton/reactor.py
>>>> index c66334b..d019554 100644
>>>> --- a/proton-c/bindings/python/proton/reactor.py
>>>> +++ b/proton-c/bindings/python/proton/reactor.py
>>>> @@ -53,6 +53,9 @@ class Task(Wrapper):
>>>>      def _init(self):
>>>>          pass
>>>>
>>>> +    def cancel(self):
>>>> +        pn_task_cancel(self._impl)
>>>> +
>>>>  class Acceptor(Wrapper):
>>>>
>>>>      def __init__(self, impl):
>>>> @@ -112,7 +115,7 @@ class Reactor(Wrapper):
>>>>          pn_reactor_yield(self._impl)
>>>>
>>>>      def mark(self):
>>>> -        pn_reactor_mark(self._impl)
>>>> +        return pn_reactor_mark(self._impl)
>>>>
>>>>      def _get_handler(self):
>>>>          return WrappedHandler.wrap(pn_reactor_get_handler(self._impl),
>>>>          self.on_error)
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h
>>>> ----------------------------------------------------------------------
>>>> diff --git a/proton-c/include/proton/reactor.h
>>>> b/proton-c/include/proton/reactor.h
>>>> index 59b2282..6f52d22 100644
>>>> --- a/proton-c/include/proton/reactor.h
>>>> +++ b/proton-c/include/proton/reactor.h
>>>> @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>>>> pn_timestamp_t deadlin
>>>>  PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
>>>>
>>>>  PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
>>>> +PN_EXTERN void pn_task_cancel(pn_task_t *task);
>>>>
>>>>  PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void
>>>>  *object);
>>>>  PN_EXTERN pn_reactor_t *pn_object_reactor(void *object);
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c
>>>> ----------------------------------------------------------------------
>>>> diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c
>>>> index 1ad0821..61efd31 100644
>>>> --- a/proton-c/src/reactor/timer.c
>>>> +++ b/proton-c/src/reactor/timer.c
>>>> @@ -27,12 +27,14 @@ struct pn_task_t {
>>>>    pn_list_t *pool;
>>>>    pn_record_t *attachments;
>>>>    pn_timestamp_t deadline;
>>>> +  bool cancelled;
>>>>  };
>>>>
>>>>  void pn_task_initialize(pn_task_t *task) {
>>>>    task->pool = NULL;
>>>>    task->attachments = pn_record();
>>>>    task->deadline = 0;
>>>> +  task->cancelled = false;
>>>>  }
>>>>
>>>>  void pn_task_finalize(pn_task_t *task) {
>>>> @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) {
>>>>    return task->attachments;
>>>>  }
>>>>
>>>> +void pn_task_cancel(pn_task_t *task) {
>>>> +    assert(task);
>>>> +    task->cancelled = true;
>>>> +}
>>>> +
>>>>  //
>>>>  // timer
>>>>  //
>>>> @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>>>> pn_timestamp_t deadline) {
>>>>    return task;
>>>>  }
>>>>
>>>> +void pni_timer_flush_cancelled(pn_timer_t *timer) {
>>>> +    while (pn_list_size(timer->tasks)) {
>>>> +        pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>>> +        if (task->cancelled) {
>>>> +            pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>>> +            assert(min == task);
>>>> +            pn_decref(min);
>>>> +        } else {
>>>> +            break;
>>>> +        }
>>>> +    }
>>>> +}
>>>> +
>>>>  pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) {
>>>>    assert(timer);
>>>> +  pni_timer_flush_cancelled(timer);
>>>>    if (pn_list_size(timer->tasks)) {
>>>>      pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>>>      return task->deadline;
>>>> @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>>>> {
>>>>      if (now >= task->deadline) {
>>>>        pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>>>        assert(min == task);
>>>> -      pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>>> +      if (!min->cancelled)
>>>> +          pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>>>        pn_decref(min);
>>>>      } else {
>>>>        break;
>>>> @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>>>> {
>>>>
>>>>  int pn_timer_tasks(pn_timer_t *timer) {
>>>>    assert(timer);
>>>> +  pni_timer_flush_cancelled(timer);
>>>>    return pn_list_size(timer->tasks);
>>>>  }
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c
>>>> ----------------------------------------------------------------------
>>>> diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
>>>> index fe6c769..059d099 100644
>>>> --- a/proton-c/src/tests/reactor.c
>>>> +++ b/proton-c/src/tests/reactor.c
>>>> @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) {
>>>>    pn_free(tevents);
>>>>  }
>>>>
>>>> +static void test_reactor_schedule_cancel(void) {
>>>> +  pn_reactor_t *reactor = pn_reactor();
>>>> +  pn_handler_t *root = pn_reactor_get_handler(reactor);
>>>> +  pn_list_t *events = pn_list(PN_VOID, 0);
>>>> +  pn_handler_add(root, test_handler(reactor, events));
>>>> +  pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL);
>>>> +  pn_task_cancel(task);
>>>> +  pn_reactor_run(reactor);
>>>> +  pn_reactor_free(reactor);
>>>> +  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED,
>>>> +         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
>>>> +  pn_free(events);
>>>> +}
>>>> +
>>>>  int main(int argc, char **argv)
>>>>  {
>>>>    test_reactor();
>>>> @@ -461,5 +475,6 @@ int main(int argc, char **argv)
>>>>    test_reactor_transfer(4*1024, 1024);
>>>>    test_reactor_schedule();
>>>>    test_reactor_schedule_handler();
>>>> +  test_reactor_schedule_cancel();
>>>>    return 0;
>>>>  }
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>> ----------------------------------------------------------------------
>>>> diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>> index 69701ab..7fb5964 100644
>>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>>> @@ -43,4 +43,8 @@ public interface Task extends Extendable {
>>>>      /** @return the reactor that created this task. */
>>>>      Reactor getReactor();
>>>>
>>>> +    /**
>>>> +     * Cancel the execution of this task. No-op if invoked after the task
>>>> was already executed.
>>>> +     */
>>>> +    void cancel();
>>>>  }
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>> ----------------------------------------------------------------------
>>>> diff --git
>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>> index 00c9a84..11bb6b8 100644
>>>> ---
>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>> +++
>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>>> @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>>>  public class TaskImpl implements Task, Comparable<TaskImpl> {
>>>>      private final long deadline;
>>>>      private final int counter;
>>>> +    private boolean cancelled = false;
>>>>      private final AtomicInteger count = new AtomicInteger();
>>>>      private Record attachments = new RecordImpl();
>>>>      private Reactor reactor;
>>>> @@ -58,6 +59,15 @@ public class TaskImpl implements Task,
>>>> Comparable<TaskImpl> {
>>>>          return deadline;
>>>>      }
>>>>
>>>> +    public boolean isCancelled() {
>>>> +        return cancelled;
>>>> +    }
>>>> +
>>>> +    @Override
>>>> +    public void cancel() {
>>>> +        cancelled = true;
>>>> +    }
>>>> +
>>>>      public void setReactor(Reactor reactor) {
>>>>          this.reactor = reactor;
>>>>      }
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>> ----------------------------------------------------------------------
>>>> diff --git
>>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>> index 32bb4f6..b8df19d 100644
>>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>>> @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>>>  public class Timer {
>>>>
>>>>      private CollectorImpl collector;
>>>> -    private PriorityQueue<Task> tasks = new PriorityQueue<Task>();
>>>> +    private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
>>>>
>>>>      public Timer(Collector collector) {
>>>>          this.collector = (CollectorImpl)collector;
>>>> @@ -44,6 +44,7 @@ public class Timer {
>>>>      }
>>>>
>>>>      long deadline() {
>>>> +        flushCancelled();
>>>>          if (tasks.size() > 0) {
>>>>              Task task = tasks.peek();
>>>>              return task.deadline();
>>>> @@ -52,12 +53,23 @@ public class Timer {
>>>>          }
>>>>      }
>>>>
>>>> +    private void flushCancelled() {
>>>> +        while (!tasks.isEmpty()) {
>>>> +            TaskImpl task = tasks.peek();
>>>> +            if (task.isCancelled())
>>>> +                tasks.poll();
>>>> +            else
>>>> +                break;
>>>> +        }
>>>> +    }
>>>> +
>>>>      void tick(long now) {
>>>>          while(!tasks.isEmpty()) {
>>>> -            Task task = tasks.peek();
>>>> +            TaskImpl task = tasks.peek();
>>>>              if (now >= task.deadline()) {
>>>>                  tasks.poll();
>>>> -                collector.put(Type.TIMER_TASK, task);
>>>> +                if (!task.isCancelled())
>>>> +                    collector.put(Type.TIMER_TASK, task);
>>>>              } else {
>>>>                  break;
>>>>              }
>>>> @@ -65,6 +77,7 @@ public class Timer {
>>>>      }
>>>>
>>>>      int tasks() {
>>>> +        flushCancelled();
>>>>          return tasks.size();
>>>>      }
>>>>  }
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py
>>>> ----------------------------------------------------------------------
>>>> diff --git a/proton-j/src/main/resources/creactor.py
>>>> b/proton-j/src/main/resources/creactor.py
>>>> index e179b23..1f8514e 100644
>>>> --- a/proton-j/src/main/resources/creactor.py
>>>> +++ b/proton-j/src/main/resources/creactor.py
>>>> @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd):
>>>>  def pn_acceptor_close(a):
>>>>      a.close()
>>>>
>>>> +def pn_task_cancel(t):
>>>> +    t.cancel()
>>>> +
>>>>  def pn_object_reactor(o):
>>>>      if hasattr(o, "impl"):
>>>>          if hasattr(o.impl, "getSession"):
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py
>>>> ----------------------------------------------------------------------
>>>> diff --git a/tests/python/proton_tests/reactor.py
>>>> b/tests/python/proton_tests/reactor.py
>>>> index 6afee30..067c5c0 100644
>>>> --- a/tests/python/proton_tests/reactor.py
>>>> +++ b/tests/python/proton_tests/reactor.py
>>>> @@ -171,3 +171,17 @@ class ExceptionTest(Test):
>>>>              assert False, "expected to barf"
>>>>          except Barf:
>>>>              pass
>>>> +
>>>> +    def test_schedule_cancel(self):
>>>> +        barf = self.reactor.schedule(10, BarfOnTask())
>>>> +        class CancelBarf:
>>>> +            def on_timer_task(self, event):
>>>> +                barf.cancel()
>>>> +        self.reactor.schedule(0, CancelBarf())
>>>> +        now = self.reactor.mark()
>>>> +        try:
>>>> +            self.reactor.run()
>>>> +            elapsed = self.reactor.mark() - now
>>>> +            assert elapsed < 10, "expected cancelled task to not delay the
>>>> reactor by " + elapsed
>>>> +        except Barf:
>>>> +            assert False, "expected barf to be cancelled"
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
>>>> For additional commands, e-mail: commits-help@qpid.apache.org
>>>>
>>>>


Re: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks

Posted by Robbie Gemmell <ro...@gmail.com>.
Hi Bozzo,

This change also seems to be causing test failures when using the
maven build (if you update things to get past the earlier failures,
caused by the commit mentioned in the other thread on proton@):

proton_tests.reactor.ExceptionTest.test_schedule_cancel ................. fail
Error during test:  Traceback (most recent call last):
    File "/home/gemmellr/workspace/proton/tests/python/proton-test",
line 360, in run
      phase()
    File "/home/gemmellr/workspace/proton/tests/python/proton_tests/reactor.py",
line 181, in test_schedule_cancel
      now = self.reactor.mark()
    File "/home/gemmellr/workspace/proton/tests/../proton-c/bindings/python/proton/reactor.py",
line 118, in mark
      return pn_reactor_mark(self._impl)
  NameError: global name 'pn_reactor_mark' is not defined


Robbie

On 9 July 2015 at 10:55, Bozo Dragojevic <bo...@digiverse.si> wrote:
> Hi Ken,
>
> I've installed python3.4 and tox and friends and tried to reproduce it here
> and I can confirm that some completely unrelated test fail mysteriously
> with python3.4 and that reverting my change makes that failure go away :)
>
> I've added more task cancellation tests to force the error while still
> in the implicated code, as I suspected it could be some refcounting
> problem but the new tests do not show anything unusual.
>
> What is even weirder, with the new tests even the python3.4 suite passes
> without segfault!
>
> So, I consider this a false positive and have left the change in,
> including the new tests at ca47d72.
>
> Does such solution work for you?
>
> Bozzo
>
> On 8. 07. 15 16.32, Ken Giusti wrote:
>> Hi Bozzo,
>>
>> Can you please revert this change?
>>
>> It is causing a segfault in the python unit tests when they are run under python3.4.
>>
>> I haven't hit the segfault on python2.7, only on python3.4
>>
>> thanks,
>>
>> -K
>>
>> ----- Original Message -----
>>> From: bozzo@apache.org
>>> To: commits@qpid.apache.org
>>> Sent: Tuesday, July 7, 2015 3:50:16 PM
>>> Subject: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks
>>>
>>> PROTON-928: cancellable tasks
>>>
>>> A scheduled task can be cancelled.
>>> A cancelled task does not prevent reactor from stopping running
>>>
>>>
>>> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
>>> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3
>>> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3
>>> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3
>>>
>>> Branch: refs/heads/master
>>> Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48
>>> Parents: 09af375
>>> Author: Bozo Dragojevic <bo...@digiverse.si>
>>> Authored: Tue Jul 7 10:17:40 2015 +0200
>>> Committer: Bozo Dragojevic <bo...@digiverse.si>
>>> Committed: Tue Jul 7 21:49:44 2015 +0200
>>>
>>> ----------------------------------------------------------------------
>>>  proton-c/bindings/python/proton/reactor.py      |  5 +++-
>>>  proton-c/include/proton/reactor.h               |  1 +
>>>  proton-c/src/reactor/timer.c                    | 25 +++++++++++++++++++-
>>>  proton-c/src/tests/reactor.c                    | 15 ++++++++++++
>>>  .../org/apache/qpid/proton/reactor/Task.java    |  4 ++++
>>>  .../qpid/proton/reactor/impl/TaskImpl.java      | 10 ++++++++
>>>  .../apache/qpid/proton/reactor/impl/Timer.java  | 19 ++++++++++++---
>>>  proton-j/src/main/resources/creactor.py         |  3 +++
>>>  tests/python/proton_tests/reactor.py            | 14 +++++++++++
>>>  9 files changed, 91 insertions(+), 5 deletions(-)
>>> ----------------------------------------------------------------------
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-c/bindings/python/proton/reactor.py
>>> b/proton-c/bindings/python/proton/reactor.py
>>> index c66334b..d019554 100644
>>> --- a/proton-c/bindings/python/proton/reactor.py
>>> +++ b/proton-c/bindings/python/proton/reactor.py
>>> @@ -53,6 +53,9 @@ class Task(Wrapper):
>>>      def _init(self):
>>>          pass
>>>
>>> +    def cancel(self):
>>> +        pn_task_cancel(self._impl)
>>> +
>>>  class Acceptor(Wrapper):
>>>
>>>      def __init__(self, impl):
>>> @@ -112,7 +115,7 @@ class Reactor(Wrapper):
>>>          pn_reactor_yield(self._impl)
>>>
>>>      def mark(self):
>>> -        pn_reactor_mark(self._impl)
>>> +        return pn_reactor_mark(self._impl)
>>>
>>>      def _get_handler(self):
>>>          return WrappedHandler.wrap(pn_reactor_get_handler(self._impl),
>>>          self.on_error)
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-c/include/proton/reactor.h
>>> b/proton-c/include/proton/reactor.h
>>> index 59b2282..6f52d22 100644
>>> --- a/proton-c/include/proton/reactor.h
>>> +++ b/proton-c/include/proton/reactor.h
>>> @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>>> pn_timestamp_t deadlin
>>>  PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
>>>
>>>  PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
>>> +PN_EXTERN void pn_task_cancel(pn_task_t *task);
>>>
>>>  PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void
>>>  *object);
>>>  PN_EXTERN pn_reactor_t *pn_object_reactor(void *object);
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c
>>> index 1ad0821..61efd31 100644
>>> --- a/proton-c/src/reactor/timer.c
>>> +++ b/proton-c/src/reactor/timer.c
>>> @@ -27,12 +27,14 @@ struct pn_task_t {
>>>    pn_list_t *pool;
>>>    pn_record_t *attachments;
>>>    pn_timestamp_t deadline;
>>> +  bool cancelled;
>>>  };
>>>
>>>  void pn_task_initialize(pn_task_t *task) {
>>>    task->pool = NULL;
>>>    task->attachments = pn_record();
>>>    task->deadline = 0;
>>> +  task->cancelled = false;
>>>  }
>>>
>>>  void pn_task_finalize(pn_task_t *task) {
>>> @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) {
>>>    return task->attachments;
>>>  }
>>>
>>> +void pn_task_cancel(pn_task_t *task) {
>>> +    assert(task);
>>> +    task->cancelled = true;
>>> +}
>>> +
>>>  //
>>>  // timer
>>>  //
>>> @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>>> pn_timestamp_t deadline) {
>>>    return task;
>>>  }
>>>
>>> +void pni_timer_flush_cancelled(pn_timer_t *timer) {
>>> +    while (pn_list_size(timer->tasks)) {
>>> +        pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>> +        if (task->cancelled) {
>>> +            pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>> +            assert(min == task);
>>> +            pn_decref(min);
>>> +        } else {
>>> +            break;
>>> +        }
>>> +    }
>>> +}
>>> +
>>>  pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) {
>>>    assert(timer);
>>> +  pni_timer_flush_cancelled(timer);
>>>    if (pn_list_size(timer->tasks)) {
>>>      pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>>      return task->deadline;
>>> @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>>> {
>>>      if (now >= task->deadline) {
>>>        pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>>        assert(min == task);
>>> -      pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>> +      if (!min->cancelled)
>>> +          pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>>        pn_decref(min);
>>>      } else {
>>>        break;
>>> @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>>> {
>>>
>>>  int pn_timer_tasks(pn_timer_t *timer) {
>>>    assert(timer);
>>> +  pni_timer_flush_cancelled(timer);
>>>    return pn_list_size(timer->tasks);
>>>  }
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
>>> index fe6c769..059d099 100644
>>> --- a/proton-c/src/tests/reactor.c
>>> +++ b/proton-c/src/tests/reactor.c
>>> @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) {
>>>    pn_free(tevents);
>>>  }
>>>
>>> +static void test_reactor_schedule_cancel(void) {
>>> +  pn_reactor_t *reactor = pn_reactor();
>>> +  pn_handler_t *root = pn_reactor_get_handler(reactor);
>>> +  pn_list_t *events = pn_list(PN_VOID, 0);
>>> +  pn_handler_add(root, test_handler(reactor, events));
>>> +  pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL);
>>> +  pn_task_cancel(task);
>>> +  pn_reactor_run(reactor);
>>> +  pn_reactor_free(reactor);
>>> +  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED,
>>> +         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
>>> +  pn_free(events);
>>> +}
>>> +
>>>  int main(int argc, char **argv)
>>>  {
>>>    test_reactor();
>>> @@ -461,5 +475,6 @@ int main(int argc, char **argv)
>>>    test_reactor_transfer(4*1024, 1024);
>>>    test_reactor_schedule();
>>>    test_reactor_schedule_handler();
>>> +  test_reactor_schedule_cancel();
>>>    return 0;
>>>  }
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>> index 69701ab..7fb5964 100644
>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>> @@ -43,4 +43,8 @@ public interface Task extends Extendable {
>>>      /** @return the reactor that created this task. */
>>>      Reactor getReactor();
>>>
>>> +    /**
>>> +     * Cancel the execution of this task. No-op if invoked after the task
>>> was already executed.
>>> +     */
>>> +    void cancel();
>>>  }
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>> ----------------------------------------------------------------------
>>> diff --git
>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>> index 00c9a84..11bb6b8 100644
>>> ---
>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>> +++
>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>> @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>>  public class TaskImpl implements Task, Comparable<TaskImpl> {
>>>      private final long deadline;
>>>      private final int counter;
>>> +    private boolean cancelled = false;
>>>      private final AtomicInteger count = new AtomicInteger();
>>>      private Record attachments = new RecordImpl();
>>>      private Reactor reactor;
>>> @@ -58,6 +59,15 @@ public class TaskImpl implements Task,
>>> Comparable<TaskImpl> {
>>>          return deadline;
>>>      }
>>>
>>> +    public boolean isCancelled() {
>>> +        return cancelled;
>>> +    }
>>> +
>>> +    @Override
>>> +    public void cancel() {
>>> +        cancelled = true;
>>> +    }
>>> +
>>>      public void setReactor(Reactor reactor) {
>>>          this.reactor = reactor;
>>>      }
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>> ----------------------------------------------------------------------
>>> diff --git
>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>> index 32bb4f6..b8df19d 100644
>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>> @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>>  public class Timer {
>>>
>>>      private CollectorImpl collector;
>>> -    private PriorityQueue<Task> tasks = new PriorityQueue<Task>();
>>> +    private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
>>>
>>>      public Timer(Collector collector) {
>>>          this.collector = (CollectorImpl)collector;
>>> @@ -44,6 +44,7 @@ public class Timer {
>>>      }
>>>
>>>      long deadline() {
>>> +        flushCancelled();
>>>          if (tasks.size() > 0) {
>>>              Task task = tasks.peek();
>>>              return task.deadline();
>>> @@ -52,12 +53,23 @@ public class Timer {
>>>          }
>>>      }
>>>
>>> +    private void flushCancelled() {
>>> +        while (!tasks.isEmpty()) {
>>> +            TaskImpl task = tasks.peek();
>>> +            if (task.isCancelled())
>>> +                tasks.poll();
>>> +            else
>>> +                break;
>>> +        }
>>> +    }
>>> +
>>>      void tick(long now) {
>>>          while(!tasks.isEmpty()) {
>>> -            Task task = tasks.peek();
>>> +            TaskImpl task = tasks.peek();
>>>              if (now >= task.deadline()) {
>>>                  tasks.poll();
>>> -                collector.put(Type.TIMER_TASK, task);
>>> +                if (!task.isCancelled())
>>> +                    collector.put(Type.TIMER_TASK, task);
>>>              } else {
>>>                  break;
>>>              }
>>> @@ -65,6 +77,7 @@ public class Timer {
>>>      }
>>>
>>>      int tasks() {
>>> +        flushCancelled();
>>>          return tasks.size();
>>>      }
>>>  }
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-j/src/main/resources/creactor.py
>>> b/proton-j/src/main/resources/creactor.py
>>> index e179b23..1f8514e 100644
>>> --- a/proton-j/src/main/resources/creactor.py
>>> +++ b/proton-j/src/main/resources/creactor.py
>>> @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd):
>>>  def pn_acceptor_close(a):
>>>      a.close()
>>>
>>> +def pn_task_cancel(t):
>>> +    t.cancel()
>>> +
>>>  def pn_object_reactor(o):
>>>      if hasattr(o, "impl"):
>>>          if hasattr(o.impl, "getSession"):
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py
>>> ----------------------------------------------------------------------
>>> diff --git a/tests/python/proton_tests/reactor.py
>>> b/tests/python/proton_tests/reactor.py
>>> index 6afee30..067c5c0 100644
>>> --- a/tests/python/proton_tests/reactor.py
>>> +++ b/tests/python/proton_tests/reactor.py
>>> @@ -171,3 +171,17 @@ class ExceptionTest(Test):
>>>              assert False, "expected to barf"
>>>          except Barf:
>>>              pass
>>> +
>>> +    def test_schedule_cancel(self):
>>> +        barf = self.reactor.schedule(10, BarfOnTask())
>>> +        class CancelBarf:
>>> +            def on_timer_task(self, event):
>>> +                barf.cancel()
>>> +        self.reactor.schedule(0, CancelBarf())
>>> +        now = self.reactor.mark()
>>> +        try:
>>> +            self.reactor.run()
>>> +            elapsed = self.reactor.mark() - now
>>> +            assert elapsed < 10, "expected cancelled task to not delay the
>>> reactor by " + elapsed
>>> +        except Barf:
>>> +            assert False, "expected barf to be cancelled"
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
>>> For additional commands, e-mail: commits-help@qpid.apache.org
>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Re: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks

Posted by Robbie Gemmell <ro...@gmail.com>.
Hi Bozzo,

This change also seems to be causing test failures when using the
maven build (if you update things to get past the earlier failures,
caused by the commit mentioned in the other thread on proton@):

proton_tests.reactor.ExceptionTest.test_schedule_cancel ................. fail
Error during test:  Traceback (most recent call last):
    File "/home/gemmellr/workspace/proton/tests/python/proton-test",
line 360, in run
      phase()
    File "/home/gemmellr/workspace/proton/tests/python/proton_tests/reactor.py",
line 181, in test_schedule_cancel
      now = self.reactor.mark()
    File "/home/gemmellr/workspace/proton/tests/../proton-c/bindings/python/proton/reactor.py",
line 118, in mark
      return pn_reactor_mark(self._impl)
  NameError: global name 'pn_reactor_mark' is not defined


Robbie

On 9 July 2015 at 10:55, Bozo Dragojevic <bo...@digiverse.si> wrote:
> Hi Ken,
>
> I've installed python3.4 and tox and friends and tried to reproduce it here
> and I can confirm that some completely unrelated test fail mysteriously
> with python3.4 and that reverting my change makes that failure go away :)
>
> I've added more task cancellation tests to force the error while still
> in the implicated code, as I suspected it could be some refcounting
> problem but the new tests do not show anything unusual.
>
> What is even weirder, with the new tests even the python3.4 suite passes
> without segfault!
>
> So, I consider this a false positive and have left the change in,
> including the new tests at ca47d72.
>
> Does such solution work for you?
>
> Bozzo
>
> On 8. 07. 15 16.32, Ken Giusti wrote:
>> Hi Bozzo,
>>
>> Can you please revert this change?
>>
>> It is causing a segfault in the python unit tests when they are run under python3.4.
>>
>> I haven't hit the segfault on python2.7, only on python3.4
>>
>> thanks,
>>
>> -K
>>
>> ----- Original Message -----
>>> From: bozzo@apache.org
>>> To: commits@qpid.apache.org
>>> Sent: Tuesday, July 7, 2015 3:50:16 PM
>>> Subject: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks
>>>
>>> PROTON-928: cancellable tasks
>>>
>>> A scheduled task can be cancelled.
>>> A cancelled task does not prevent reactor from stopping running
>>>
>>>
>>> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
>>> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3
>>> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3
>>> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3
>>>
>>> Branch: refs/heads/master
>>> Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48
>>> Parents: 09af375
>>> Author: Bozo Dragojevic <bo...@digiverse.si>
>>> Authored: Tue Jul 7 10:17:40 2015 +0200
>>> Committer: Bozo Dragojevic <bo...@digiverse.si>
>>> Committed: Tue Jul 7 21:49:44 2015 +0200
>>>
>>> ----------------------------------------------------------------------
>>>  proton-c/bindings/python/proton/reactor.py      |  5 +++-
>>>  proton-c/include/proton/reactor.h               |  1 +
>>>  proton-c/src/reactor/timer.c                    | 25 +++++++++++++++++++-
>>>  proton-c/src/tests/reactor.c                    | 15 ++++++++++++
>>>  .../org/apache/qpid/proton/reactor/Task.java    |  4 ++++
>>>  .../qpid/proton/reactor/impl/TaskImpl.java      | 10 ++++++++
>>>  .../apache/qpid/proton/reactor/impl/Timer.java  | 19 ++++++++++++---
>>>  proton-j/src/main/resources/creactor.py         |  3 +++
>>>  tests/python/proton_tests/reactor.py            | 14 +++++++++++
>>>  9 files changed, 91 insertions(+), 5 deletions(-)
>>> ----------------------------------------------------------------------
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-c/bindings/python/proton/reactor.py
>>> b/proton-c/bindings/python/proton/reactor.py
>>> index c66334b..d019554 100644
>>> --- a/proton-c/bindings/python/proton/reactor.py
>>> +++ b/proton-c/bindings/python/proton/reactor.py
>>> @@ -53,6 +53,9 @@ class Task(Wrapper):
>>>      def _init(self):
>>>          pass
>>>
>>> +    def cancel(self):
>>> +        pn_task_cancel(self._impl)
>>> +
>>>  class Acceptor(Wrapper):
>>>
>>>      def __init__(self, impl):
>>> @@ -112,7 +115,7 @@ class Reactor(Wrapper):
>>>          pn_reactor_yield(self._impl)
>>>
>>>      def mark(self):
>>> -        pn_reactor_mark(self._impl)
>>> +        return pn_reactor_mark(self._impl)
>>>
>>>      def _get_handler(self):
>>>          return WrappedHandler.wrap(pn_reactor_get_handler(self._impl),
>>>          self.on_error)
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-c/include/proton/reactor.h
>>> b/proton-c/include/proton/reactor.h
>>> index 59b2282..6f52d22 100644
>>> --- a/proton-c/include/proton/reactor.h
>>> +++ b/proton-c/include/proton/reactor.h
>>> @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>>> pn_timestamp_t deadlin
>>>  PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
>>>
>>>  PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
>>> +PN_EXTERN void pn_task_cancel(pn_task_t *task);
>>>
>>>  PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void
>>>  *object);
>>>  PN_EXTERN pn_reactor_t *pn_object_reactor(void *object);
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c
>>> index 1ad0821..61efd31 100644
>>> --- a/proton-c/src/reactor/timer.c
>>> +++ b/proton-c/src/reactor/timer.c
>>> @@ -27,12 +27,14 @@ struct pn_task_t {
>>>    pn_list_t *pool;
>>>    pn_record_t *attachments;
>>>    pn_timestamp_t deadline;
>>> +  bool cancelled;
>>>  };
>>>
>>>  void pn_task_initialize(pn_task_t *task) {
>>>    task->pool = NULL;
>>>    task->attachments = pn_record();
>>>    task->deadline = 0;
>>> +  task->cancelled = false;
>>>  }
>>>
>>>  void pn_task_finalize(pn_task_t *task) {
>>> @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) {
>>>    return task->attachments;
>>>  }
>>>
>>> +void pn_task_cancel(pn_task_t *task) {
>>> +    assert(task);
>>> +    task->cancelled = true;
>>> +}
>>> +
>>>  //
>>>  // timer
>>>  //
>>> @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer,
>>> pn_timestamp_t deadline) {
>>>    return task;
>>>  }
>>>
>>> +void pni_timer_flush_cancelled(pn_timer_t *timer) {
>>> +    while (pn_list_size(timer->tasks)) {
>>> +        pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>> +        if (task->cancelled) {
>>> +            pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>> +            assert(min == task);
>>> +            pn_decref(min);
>>> +        } else {
>>> +            break;
>>> +        }
>>> +    }
>>> +}
>>> +
>>>  pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) {
>>>    assert(timer);
>>> +  pni_timer_flush_cancelled(timer);
>>>    if (pn_list_size(timer->tasks)) {
>>>      pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>>>      return task->deadline;
>>> @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>>> {
>>>      if (now >= task->deadline) {
>>>        pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>>>        assert(min == task);
>>> -      pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>> +      if (!min->cancelled)
>>> +          pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>>>        pn_decref(min);
>>>      } else {
>>>        break;
>>> @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
>>> {
>>>
>>>  int pn_timer_tasks(pn_timer_t *timer) {
>>>    assert(timer);
>>> +  pni_timer_flush_cancelled(timer);
>>>    return pn_list_size(timer->tasks);
>>>  }
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
>>> index fe6c769..059d099 100644
>>> --- a/proton-c/src/tests/reactor.c
>>> +++ b/proton-c/src/tests/reactor.c
>>> @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) {
>>>    pn_free(tevents);
>>>  }
>>>
>>> +static void test_reactor_schedule_cancel(void) {
>>> +  pn_reactor_t *reactor = pn_reactor();
>>> +  pn_handler_t *root = pn_reactor_get_handler(reactor);
>>> +  pn_list_t *events = pn_list(PN_VOID, 0);
>>> +  pn_handler_add(root, test_handler(reactor, events));
>>> +  pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL);
>>> +  pn_task_cancel(task);
>>> +  pn_reactor_run(reactor);
>>> +  pn_reactor_free(reactor);
>>> +  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED,
>>> +         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
>>> +  pn_free(events);
>>> +}
>>> +
>>>  int main(int argc, char **argv)
>>>  {
>>>    test_reactor();
>>> @@ -461,5 +475,6 @@ int main(int argc, char **argv)
>>>    test_reactor_transfer(4*1024, 1024);
>>>    test_reactor_schedule();
>>>    test_reactor_schedule_handler();
>>> +  test_reactor_schedule_cancel();
>>>    return 0;
>>>  }
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>> index 69701ab..7fb5964 100644
>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
>>> @@ -43,4 +43,8 @@ public interface Task extends Extendable {
>>>      /** @return the reactor that created this task. */
>>>      Reactor getReactor();
>>>
>>> +    /**
>>> +     * Cancel the execution of this task. No-op if invoked after the task
>>> was already executed.
>>> +     */
>>> +    void cancel();
>>>  }
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>> ----------------------------------------------------------------------
>>> diff --git
>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>> index 00c9a84..11bb6b8 100644
>>> ---
>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>> +++
>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
>>> @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>>  public class TaskImpl implements Task, Comparable<TaskImpl> {
>>>      private final long deadline;
>>>      private final int counter;
>>> +    private boolean cancelled = false;
>>>      private final AtomicInteger count = new AtomicInteger();
>>>      private Record attachments = new RecordImpl();
>>>      private Reactor reactor;
>>> @@ -58,6 +59,15 @@ public class TaskImpl implements Task,
>>> Comparable<TaskImpl> {
>>>          return deadline;
>>>      }
>>>
>>> +    public boolean isCancelled() {
>>> +        return cancelled;
>>> +    }
>>> +
>>> +    @Override
>>> +    public void cancel() {
>>> +        cancelled = true;
>>> +    }
>>> +
>>>      public void setReactor(Reactor reactor) {
>>>          this.reactor = reactor;
>>>      }
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>> ----------------------------------------------------------------------
>>> diff --git
>>> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>> index 32bb4f6..b8df19d 100644
>>> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
>>> @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>>>  public class Timer {
>>>
>>>      private CollectorImpl collector;
>>> -    private PriorityQueue<Task> tasks = new PriorityQueue<Task>();
>>> +    private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
>>>
>>>      public Timer(Collector collector) {
>>>          this.collector = (CollectorImpl)collector;
>>> @@ -44,6 +44,7 @@ public class Timer {
>>>      }
>>>
>>>      long deadline() {
>>> +        flushCancelled();
>>>          if (tasks.size() > 0) {
>>>              Task task = tasks.peek();
>>>              return task.deadline();
>>> @@ -52,12 +53,23 @@ public class Timer {
>>>          }
>>>      }
>>>
>>> +    private void flushCancelled() {
>>> +        while (!tasks.isEmpty()) {
>>> +            TaskImpl task = tasks.peek();
>>> +            if (task.isCancelled())
>>> +                tasks.poll();
>>> +            else
>>> +                break;
>>> +        }
>>> +    }
>>> +
>>>      void tick(long now) {
>>>          while(!tasks.isEmpty()) {
>>> -            Task task = tasks.peek();
>>> +            TaskImpl task = tasks.peek();
>>>              if (now >= task.deadline()) {
>>>                  tasks.poll();
>>> -                collector.put(Type.TIMER_TASK, task);
>>> +                if (!task.isCancelled())
>>> +                    collector.put(Type.TIMER_TASK, task);
>>>              } else {
>>>                  break;
>>>              }
>>> @@ -65,6 +77,7 @@ public class Timer {
>>>      }
>>>
>>>      int tasks() {
>>> +        flushCancelled();
>>>          return tasks.size();
>>>      }
>>>  }
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py
>>> ----------------------------------------------------------------------
>>> diff --git a/proton-j/src/main/resources/creactor.py
>>> b/proton-j/src/main/resources/creactor.py
>>> index e179b23..1f8514e 100644
>>> --- a/proton-j/src/main/resources/creactor.py
>>> +++ b/proton-j/src/main/resources/creactor.py
>>> @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd):
>>>  def pn_acceptor_close(a):
>>>      a.close()
>>>
>>> +def pn_task_cancel(t):
>>> +    t.cancel()
>>> +
>>>  def pn_object_reactor(o):
>>>      if hasattr(o, "impl"):
>>>          if hasattr(o.impl, "getSession"):
>>>
>>> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py
>>> ----------------------------------------------------------------------
>>> diff --git a/tests/python/proton_tests/reactor.py
>>> b/tests/python/proton_tests/reactor.py
>>> index 6afee30..067c5c0 100644
>>> --- a/tests/python/proton_tests/reactor.py
>>> +++ b/tests/python/proton_tests/reactor.py
>>> @@ -171,3 +171,17 @@ class ExceptionTest(Test):
>>>              assert False, "expected to barf"
>>>          except Barf:
>>>              pass
>>> +
>>> +    def test_schedule_cancel(self):
>>> +        barf = self.reactor.schedule(10, BarfOnTask())
>>> +        class CancelBarf:
>>> +            def on_timer_task(self, event):
>>> +                barf.cancel()
>>> +        self.reactor.schedule(0, CancelBarf())
>>> +        now = self.reactor.mark()
>>> +        try:
>>> +            self.reactor.run()
>>> +            elapsed = self.reactor.mark() - now
>>> +            assert elapsed < 10, "expected cancelled task to not delay the
>>> reactor by " + elapsed
>>> +        except Barf:
>>> +            assert False, "expected barf to be cancelled"
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
>>> For additional commands, e-mail: commits-help@qpid.apache.org
>>>
>>>
>