You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Yasuhiro Matsuda <ya...@gmail.com> on 2015/02/28 01:14:04 UTC

Review Request 31568: Patch for KAFKA-1989

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1989
    https://issues.apache.org/jira/browse/KAFKA-1989


Repository: kafka


Description
-------

new purgatory implementation


Diffs
-----

  core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
  core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
  core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
  core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31568/diff/


Testing
-------


Thanks,

Yasuhiro Matsuda


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.

> On March 18, 2015, 4:14 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 373
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line373>
> >
> >     Try purging on each watcher list in each doWork will likely increase the CPU a lot, we can reduce the CPU usage by only purging when watched() is too large.

A list scan happens only when the unreachable count is greater than 100. The unreachable count is incremented by polling the reference queue. So, it is driven by GC cycles in some sense. If there is nothing to purge, the scan won't happen. And it is set to zero when tryCompleteWatched is called. So, I expect that the unreachable count is low most of the time.

There is a problem in using watched(). It is the count of watched requests. On a busy server, watched() may exceed the threashold more often even when there is not many entries to purge.


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76899
-----------------------------------------------------------


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76899
-----------------------------------------------------------



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124592>

    This function is no longer triggered in the new purgatory; we need to add it back since it is instantiated in produce / fetch for recording metrics, etc.



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124591>

    Try purging on each watcher list in each doWork will likely increase the CPU a lot, we can reduce the CPU usage by only purging when watched() is too large.


- Guozhang Wang


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.

> On April 1, 2015, 6:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 126
> > <https://reviews.apache.org/r/31568/diff/2/?file=901425#file901425line126>
> >
> >     timeoutTimer is an implementation detail in ExpiredOperationReaper. Could we hide it inside ExpiredOperationReaper and expose needed apis?

I feel it is more natural to have timeoutTimer in DelayedOperationPurgatory just like watchersForKey belongs to it. I'd like to make ExpiredOperationReaper just a simple thread to make it easy to understand. To make it clearer I will remove timeoutTimer from the constructor argument of ExpiredOperationReaper.


> On April 1, 2015, 6:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 199-202
> > <https://reviews.apache.org/r/31568/diff/2/?file=901425#file901425line199>
> >
> >     Do we need to do that or could we just let the expirationReaper handle it?

There is a race condition. If an operation is completed just before added to the timer, the opration stays in the timer until timeout because it missed the chance to call cancel(). It is better to remove completed operation right away.


> On April 1, 2015, 6:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 26
> > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line26>
> >
> >     Do you think the defaults for tickMs and wheelSize are good enough that we don't need to expose them as configs?

The current default seems good. But I want to keep this parameters. Timer is used only by the purgatory right now, but the Timer implementaion is generic and is possible to be used for other purposes. Some may need to use different parameters.


> On April 1, 2015, 6:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, lines 44-45
> > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line44>
> >
> >     Could we use inLock() where applicable?

I don't like to have a closure overhead here. Scala's closure creates a new instance of closure object every time.


> On April 1, 2015, 6:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, lines 54-55
> > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line54>
> >
> >     Do we need to run this in a separate thread? Could we just run this in the expireReaper thread as before?

In a timer implementation, this is a commmon practice. It ensures timely expirations regardless of the size of the timer tasks. To be as generic as possible, I would like to use a supplied executor service. If it is really really necessary to execute the operations in expirationReaper, we can write a custom ExecutorService that runs the task in the caller thread. But I don't think it is necessary. it is more advantageous to have a separate thread on multi-core machine.


> On April 1, 2015, 6:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 61
> > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line61>
> >
> >     Do we need to pass in the timeoutMs or could be just hardcode the 200ms in poll()? The timemout is just so that we can shutdown the expiration reaper thread since we don't interrupt it.

Again, I want to keep this parameter to make useful for other purposes not just the purgatory.


> On April 1, 2015, 6:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, lines 66-67
> > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line66>
> >
> >     Would it be simpler to just handle one poll item and return? The outer loop will call this methold again on the next item. Also, poll() can block. So the expiration reaper thread may not be able to shut down if the queue is empty.

advanceClock() should process all expired buckets while holding writeLock. Otherwise, I don't think it can maintain the consistency of the data structure.


> On April 1, 2015, 6:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 72
> > <https://reviews.apache.org/r/31568/diff/2/?file=901430#file901430line72>
> >
> >     It seems that all wheels at different hierachies always have the same startMs. Does that mean that the first bucket in the coarser level wheel is never used?

At the beginning, yes. But it will be used later because a wheel is a circular buffer.


> On April 1, 2015, 6:46 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala, lines 55-59
> > <https://reviews.apache.org/r/31568/diff/2/?file=901432#file901432line55>
> >
> >     Not quite sure what this is testing. It's not clear to me why the sharedCounter won't increase after add. Perhaps, we can add some comments.

It is testing that reinserting the existing tasks doesn't change the task count. I will add comments.


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review78444
-----------------------------------------------------------


On March 20, 2015, 3:45 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated March 20, 2015, 3:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Jun Rao <ju...@gmail.com>.

> On April 1, 2015, 6:46 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, lines 44-45
> > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line44>
> >
> >     Could we use inLock() where applicable?
> 
> Yasuhiro Matsuda wrote:
>     I don't like to have a closure overhead here. Scala's closure creates a new instance of closure object every time.

How much is the closure overhead? We use inLock() in quite a few other places. It would be good to keep the usage consistent. If it's too much overhead, perhaps we can remove all inLock() usage in a seperate jira?


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review78444
-----------------------------------------------------------


On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 1, 2015, 8:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review78444
-----------------------------------------------------------


Thanks for the patch! A few comments below.


core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment127361>

    timeoutTimer is an implementation detail in ExpiredOperationReaper. Could we hide it inside ExpiredOperationReaper and expose needed apis?



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment127363>

    Do we need to do that or could we just let the expirationReaper handle it?



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment127368>

    Our convention is to only use () for methods that have side effects. So, next will use (), but hasNext will not.



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment127369>

    Is the null check necessary? If hasNext is true, the next item will always be available, right?



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment127373>

    Reworded the description a bit. Is that clearer?
    
    Trigger a purge if the number of completed but still being watched operations is larger than the purge threshold. That number is computed by the difference btw the estimated total number of operations and the number of pending delayed operations.



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment127374>

    are be completed => are completed



core/src/main/scala/kafka/utils/timer/Timer.scala
<https://reviews.apache.org/r/31568/#comment127429>

    Do you think the defaults for tickMs and wheelSize are good enough that we don't need to expose them as configs?



core/src/main/scala/kafka/utils/timer/Timer.scala
<https://reviews.apache.org/r/31568/#comment127167>

    Could we use inLock() where applicable?



core/src/main/scala/kafka/utils/timer/Timer.scala
<https://reviews.apache.org/r/31568/#comment127225>

    Do we need to run this in a separate thread? Could we just run this in the expireReaper thread as before?



core/src/main/scala/kafka/utils/timer/Timer.scala
<https://reviews.apache.org/r/31568/#comment127423>

    Do we need to pass in the timeoutMs or could be just hardcode the 200ms in poll()? The timemout is just so that we can shutdown the expiration reaper thread since we don't interrupt it.



core/src/main/scala/kafka/utils/timer/Timer.scala
<https://reviews.apache.org/r/31568/#comment127228>

    Would it be simpler to just handle one poll item and return? The outer loop will call this methold again on the next item. Also, poll() can block. So the expiration reaper thread may not be able to shut down if the queue is empty.



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment127198>

    Could we explain how TimingWheel works in the comments?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment127205>

    tickSizeMs => tickMs



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment127210>

    Put in its own bucket?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment127224>

    It seems that all wheels at different hierachies always have the same startMs. Does that mean that the first bucket in the coarser level wheel is never used?



core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
<https://reviews.apache.org/r/31568/#comment127380>

    The message string is incorrect. We expect a total of 2 delayed operations. Also, the value after "instead of" is missing. Ditto to other assertEquals in this test.



core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
<https://reviews.apache.org/r/31568/#comment127383>

    Do we need both checks? It seems that one of them is enough.



core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
<https://reviews.apache.org/r/31568/#comment127415>

    Not quite sure what this is testing. It's not clear to me why the sharedCounter won't increase after add. Perhaps, we can add some comments.



core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
<https://reviews.apache.org/r/31568/#comment127417>

    Should we define expirationMs as override to make it clear?



core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
<https://reviews.apache.org/r/31568/#comment127421>

    map() probably should be changed to foreach().


- Jun Rao


On March 20, 2015, 3:45 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated March 20, 2015, 3:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.

> On April 6, 2015, 10:35 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 104-105
> > <https://reviews.apache.org/r/31568/diff/3/?file=912738#file912738line104>
> >
> >     We probably should call forceComplete() first and only if it returns true, run onExpiration().
> 
> Yasuhiro Matsuda wrote:
>     This came from the original ExpiredOperationReaper.expireNext(). Also the comment on onExpiration says, "Call-back to execute when a delayed operation expires, but before completion." So, I cannot call forceComplete before onExpiration. I think we can do a little refactoring to clean this up later.
> 
> Guozhang Wang wrote:
>     Yeah I think this is actually an old bug rather than introduced by this patch: if the task is already completed before it is timed out we should not mark it as "expired". We should change the comment of onExpiration to "Call-back to execute when a delayed operation gets expired and hence forced to complete."

We are changing the behaviour as Guozhang suggested.


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review78668
-----------------------------------------------------------


On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 7, 2015, 9:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.

> On April 6, 2015, 10:35 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 104-105
> > <https://reviews.apache.org/r/31568/diff/3/?file=912738#file912738line104>
> >
> >     We probably should call forceComplete() first and only if it returns true, run onExpiration().

This came from the original ExpiredOperationReaper.expireNext(). Also the comment on onExpiration says, "Call-back to execute when a delayed operation expires, but before completion." So, I cannot call forceComplete before onExpiration. I think we can do a little refactoring to clean this up later.


> On April 6, 2015, 10:35 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 26
> > <https://reviews.apache.org/r/31568/diff/3/?file=912740#file912740line26>
> >
> >     Is Timer too general a name? Should we rename it to sth like DelayedOperationTimer?

Timer is intended to be a general timer facility.


> On April 6, 2015, 10:35 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala, lines 55-57
> > <https://reviews.apache.org/r/31568/diff/3/?file=912745#file912745line55>
> >
> >     Actually, where is the logic to not increment the counter on reinserting existing tasks? TimerTaskList.add() seems to always increment the counter.

The constructor call, new TimerTaskEntry(task), removes an existing entry for this time task (if any), and decrements the counter.
I will add comments.


> On April 6, 2015, 10:35 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 77
> > <https://reviews.apache.org/r/31568/diff/3/?file=912743#file912743line77>
> >
> >     I was initially puzzled on how we synchronize btw add() and advanceClock(). Then I realized that the synchronization is actually done in the caller in Timer. I was thinking of how to make this part clearer. One way is to add the documentation here. Another way is to move the read/write lock from Timer to here, and pass in needed data structures like delayQueue.

Yeah, it is a little puzzling. I went through similar thoughts. The tricky part is that there can be multiple wheels in a timer. Locking at wheel level is also confusing and not good in terms of perforamnce. And I concluded that the locking should be done in a higher leve, i.e., Timer. I will add more comments.


> On April 6, 2015, 10:35 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 52
> > <https://reviews.apache.org/r/31568/diff/3/?file=912743#file912743line52>
> >
> >     It would be useful to add that the timing wheel implementation is used to optimize the common case when operations are completed before they time out.

Yes. It is true that the hierarchical timing wheels works especially well when operations are completed before they time out. But, even when everything times out, it still has advantageous if there are many items in the timer. Its insert cost (including reinsert) and delete cost are O(m) and O(1) respectively (m is the number of wheels) while priority queue based timers takes O(log N) for both insert and delete where N is the number of items in the queue. Anyway, I will add more comments.


> On April 6, 2015, 10:35 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, lines 40-44
> > <https://reviews.apache.org/r/31568/diff/3/?file=912743#file912743line40>
> >
> >     For the hierachical part, let's say that u is 1 and n is 2. If current time is c, then the buckets at different levels are:
> >     
> >     level    buckets
> >     1        [c,c]   [c+1,c+1]
> >     2        [c,c+1] [c+2,c+3]
> >     3        [c,c+3] [c+4,c+7]
> >     
> >     So, at any given point of time, [c,c+1] at level 2 and [c,c+3] at level 3 will never be used since those buckets are already covered in the lower level.
> >     
> >     This seems a bit wasteful. To remove that waste, we could choose to statt the 2nd level at c+2 and the 3rd level at c+6, etc. Do we choose to use the same currernt time as the start time at all levels for simplicity? If so, this is probably fine since the larger the n, the less the waste. However, it's probably worth documenting that the buckets at different levels can overlap?

Yes, the code is simpler this way. I'll add more comments.


> On April 6, 2015, 10:35 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 369
> > <https://reviews.apache.org/r/31568/diff/3/?file=912738#file912738line369>
> >
> >     Perhaps we should trigger a purge if the number of total purgible operations (instead of the number of unque purgible operations) is more than the purgeInternal. This can be estimated as watched/estimatedTotalOperations.get * (estimatedTotalOperations.get - delayed).

Would you explain why that is better? It will trigger a lot more purge calls. And the frequency of calls depends on how many keys each request has. When the average number of keys per operation is large, it is possible to have a case that the total number of watchers exceeds the threshold, but there are only a few distinct operations to remove. It is hard to tune.


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review78668
-----------------------------------------------------------


On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 1, 2015, 8:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Guozhang Wang <wa...@gmail.com>.

> On April 6, 2015, 10:35 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 104-105
> > <https://reviews.apache.org/r/31568/diff/3/?file=912738#file912738line104>
> >
> >     We probably should call forceComplete() first and only if it returns true, run onExpiration().
> 
> Yasuhiro Matsuda wrote:
>     This came from the original ExpiredOperationReaper.expireNext(). Also the comment on onExpiration says, "Call-back to execute when a delayed operation expires, but before completion." So, I cannot call forceComplete before onExpiration. I think we can do a little refactoring to clean this up later.

Yeah I think this is actually an old bug rather than introduced by this patch: if the task is already completed before it is timed out we should not mark it as "expired". We should change the comment of onExpiration to "Call-back to execute when a delayed operation gets expired and hence forced to complete."


> On April 6, 2015, 10:35 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, lines 40-44
> > <https://reviews.apache.org/r/31568/diff/3/?file=912743#file912743line40>
> >
> >     For the hierachical part, let's say that u is 1 and n is 2. If current time is c, then the buckets at different levels are:
> >     
> >     level    buckets
> >     1        [c,c]   [c+1,c+1]
> >     2        [c,c+1] [c+2,c+3]
> >     3        [c,c+3] [c+4,c+7]
> >     
> >     So, at any given point of time, [c,c+1] at level 2 and [c,c+3] at level 3 will never be used since those buckets are already covered in the lower level.
> >     
> >     This seems a bit wasteful. To remove that waste, we could choose to statt the 2nd level at c+2 and the 3rd level at c+6, etc. Do we choose to use the same currernt time as the start time at all levels for simplicity? If so, this is probably fine since the larger the n, the less the waste. However, it's probably worth documenting that the buckets at different levels can overlap?
> 
> Yasuhiro Matsuda wrote:
>     Yes, the code is simpler this way. I'll add more comments.

I think the levels in this case should be:

level    buckets
1        [c,c+1) [c+1,c+2)
2        [c,c+2) [c+2,c+4)
3        [c,c+4) [c+4,c+8)


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review78668
-----------------------------------------------------------


On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 7, 2015, 9:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review78668
-----------------------------------------------------------


Thanks for the new patch. Now that I understood the logic better, I think this is a really smart implementation. A few more comments below.


core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment128118>

    We probably should call forceComplete() first and only if it returns true, run onExpiration().



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment128145>

    Java executor service by default eats all unhandled exceptions. For easier debugging, we will need to add an UncaughtExceptionHandler and log an error of the exception. See KafkaSchedule.startup() and Utils.newThread().



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment128129>

    Perhaps we should trigger a purge if the number of total purgible operations (instead of the number of unque purgible operations) is more than the purgeInternal. This can be estimated as watched/estimatedTotalOperations.get * (estimatedTotalOperations.get - delayed).



core/src/main/scala/kafka/utils/timer/Timer.scala
<https://reviews.apache.org/r/31568/#comment128139>

    Is Timer too general a name? Should we rename it to sth like DelayedOperationTimer?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128109>

    For the hierachical part, let's say that u is 1 and n is 2. If current time is c, then the buckets at different levels are:
    
    level    buckets
    1        [c,c]   [c+1,c+1]
    2        [c,c+1] [c+2,c+3]
    3        [c,c+3] [c+4,c+7]
    
    So, at any given point of time, [c,c+1] at level 2 and [c,c+3] at level 3 will never be used since those buckets are already covered in the lower level.
    
    This seems a bit wasteful. To remove that waste, we could choose to statt the 2nd level at c+2 and the 3rd level at c+6, etc. Do we choose to use the same currernt time as the start time at all levels for simplicity? If so, this is probably fine since the larger the n, the less the waste. However, it's probably worth documenting that the buckets at different levels can overlap?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment127579>

    A overflow => An overflow
    
    in a overflow => in an overflow



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment127580>

    moved the finer => moved to the finer



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128112>

    It would be useful to add that the timing wheel implementation is used to optimize the common case when operations are completed before they time out.



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128117>

    I was initially puzzled on how we synchronize btw add() and advanceClock(). Then I realized that the synchronization is actually done in the caller in Timer. I was thinking of how to make this part clearer. One way is to add the documentation here. Another way is to move the read/write lock from Timer to here, and pass in needed data structures like delayQueue.



core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
<https://reviews.apache.org/r/31568/#comment128146>

    should have 5 => should have 6



core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
<https://reviews.apache.org/r/31568/#comment128148>

    Actually, where is the logic to not increment the counter on reinserting existing tasks? TimerTaskList.add() seems to always increment the counter.


- Jun Rao


On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 1, 2015, 8:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.

> On April 8, 2015, 7:15 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 123-126
> > <https://reviews.apache.org/r/31568/diff/4/?file=920086#file920086line123>
> >
> >     We do not need to address it in this patch but may think about it moving forward: for now delayed produce / fetch's onComplete() only writes some data to the socket, while in the future other delayed operations may do more work in onComplete() and hence taking time; we may want to extend it to multiple executor threads in the pool.

I agree!


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79408
-----------------------------------------------------------


On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 7, 2015, 9:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79408
-----------------------------------------------------------

Ship it!


Looks great to me. Just a minor comment below.


core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment128725>

    We do not need to address it in this patch but may think about it moving forward: for now delayed produce / fetch's onComplete() only writes some data to the socket, while in the future other delayed operations may do more work in onComplete() and hence taking time; we may want to extend it to multiple executor threads in the pool.


- Guozhang Wang


On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 7, 2015, 9:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79263
-----------------------------------------------------------


- Guozhang Wang


On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 7, 2015, 9:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.

> On April 7, 2015, 10:17 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 178-188
> > <https://reviews.apache.org/r/31568/diff/3/?file=912738#file912738line178>
> >
> >     What is the usage of watchCreated here? It seems only gets intialized as false and if the function is not returned in line 182, it will always execute line 188. So would that be equal to just moving line 188 to after line 190?

It is used to increment estimatedTotalOperations only once per operation that actually create a watcher. After watchCreated is set to true, the subsequent iterations won't increment estimatedTotalOperations.


> On April 7, 2015, 10:17 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 101
> > <https://reviews.apache.org/r/31568/diff/3/?file=912743#file912743line101>
> >
> >     We already check (overflowWheel == null) inside the function, hence it can be removed here.

This is an optimization to avoid synchronizations over and over. Once overflowWheel is created it is reduced to a single null compare (no method call, no synchronization), which is the cheapest.


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79257
-----------------------------------------------------------


On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 7, 2015, 9:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79257
-----------------------------------------------------------



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment128493>

    What is the usage of watchCreated here? It seems only gets intialized as false and if the function is not returned in line 182, it will always execute line 188. So would that be equal to just moving line 188 to after line 190?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128498>

    We already check (overflowWheel == null) inside the function, hence it can be removed here.


- Guozhang Wang


On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 7, 2015, 9:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Jun Rao <ju...@gmail.com>.

> On April 8, 2015, 6:44 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 96
> > <https://reviews.apache.org/r/31568/diff/4/?file=920091#file920091line96>
> >
> >     Does this need to be volatile since all accesses to it are synchronized?
> 
> Yasuhiro Matsuda wrote:
>     Actually, they are not synchronized. Accesses are protected by read/write locks in Timer. I think it is necessary to have volatile.

The read/write lock should create the memory barrier, right?


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79269
-----------------------------------------------------------


On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 7, 2015, 9:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.

> On April 8, 2015, 6:44 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 59
> > <https://reviews.apache.org/r/31568/diff/3-4/?file=912743#file912743line59>
> >
> >     Each bucket here needs to cover a window of size 3, right?

Sorry, I messed up the example.


> On April 8, 2015, 6:44 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, lines 67-68
> > <https://reviews.apache.org/r/31568/diff/3-4/?file=912743#file912743line67>
> >
> >     Aren't these two levels starting at c+1?

Level 2 and 3 do not have buckets starting at c+1.


> On April 8, 2015, 6:44 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 96
> > <https://reviews.apache.org/r/31568/diff/4/?file=920091#file920091line96>
> >
> >     Does this need to be volatile since all accesses to it are synchronized?

Actually, they are not synchronized. Accesses are protected by read/write locks in Timer. I think it is necessary to have volatile.


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79269
-----------------------------------------------------------


On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 7, 2015, 9:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review79269
-----------------------------------------------------------


Thanks for the latest patch. Looks good. Just a few more minor comments.


core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment128508>

    Now that we have added the uncaught exception handler in executor, it seems that we don't need to log the error here again.



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128525>

    Each bucket here needs to cover a window of size 3, right?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128526>

    Each bucket here needs to cover a window of size 9, right?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128530>

    Aren't these two levels starting at c+1?



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment128679>

    This thread probably shouldn't be daemon since we want the thread to exit when shutting down the executor.



core/src/main/scala/kafka/utils/timer/Timer.scala
<https://reviews.apache.org/r/31568/#comment128683>

    Perhaps we can add a comment on timeoutMs. On surface, it may appear that we are advancing the clock to timeoutMs, which is not the case.



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128540>

    ant => any



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128676>

    Does this need to be volatile since all accesses to it are synchronized?


- Jun Rao


On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 7, 2015, 9:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/
-----------------------------------------------------------

(Updated April 8, 2015, 9:30 p.m.)


Review request for kafka.


Bugs: KAFKA-1989
    https://issues.apache.org/jira/browse/KAFKA-1989


Repository: kafka


Description
-------

new purgatory implementation


Diffs (updated)
-----

  core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
  core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
  core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
  core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31568/diff/


Testing
-------


Thanks,

Yasuhiro Matsuda


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/
-----------------------------------------------------------

(Updated April 8, 2015, 8:28 p.m.)


Review request for kafka.


Bugs: KAFKA-1989
    https://issues.apache.org/jira/browse/KAFKA-1989


Repository: kafka


Description
-------

new purgatory implementation


Diffs (updated)
-----

  core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
  core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
  core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
  core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31568/diff/


Testing
-------


Thanks,

Yasuhiro Matsuda


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/
-----------------------------------------------------------

(Updated April 7, 2015, 9:59 p.m.)


Review request for kafka.


Bugs: KAFKA-1989
    https://issues.apache.org/jira/browse/KAFKA-1989


Repository: kafka


Description
-------

new purgatory implementation


Diffs (updated)
-----

  core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
  core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc 
  core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
  core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31568/diff/


Testing
-------


Thanks,

Yasuhiro Matsuda


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/
-----------------------------------------------------------

(Updated April 1, 2015, 8:50 p.m.)


Review request for kafka.


Bugs: KAFKA-1989
    https://issues.apache.org/jira/browse/KAFKA-1989


Repository: kafka


Description
-------

new purgatory implementation


Diffs (updated)
-----

  core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
  core/src/main/scala/kafka/server/ReplicaManager.scala 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 
  core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
  core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31568/diff/


Testing
-------


Thanks,

Yasuhiro Matsuda


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/
-----------------------------------------------------------

(Updated March 20, 2015, 3:45 p.m.)


Review request for kafka.


Bugs: KAFKA-1989
    https://issues.apache.org/jira/browse/KAFKA-1989


Repository: kafka


Description
-------

new purgatory implementation


Diffs (updated)
-----

  core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
  core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31568/diff/


Testing
-------


Thanks,

Yasuhiro Matsuda


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.

> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
> > <https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68>
> >
> >     I think bucket.flush(reinsurt) will always fail on all the items since their expiration time will always < bucket expiration + ticketMs, i.e. the returned bucket from the delayed queue has already expired all its items. In this case, could we just call foreach(submit) on all of them instead of trying to reinsurt them?

It is true only for the lowest wheel. Reinsert is necessary to make timing wheels work. A bucket from a higher wheel may contain tasks not expired (a tick time is longer in a higher wheel).


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 116
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line116>
> >
> >     We need to make tickMs and wheelSize configurable.

What is the motivation? I don't think it is a good idea to allow users to configure them.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 253
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line253>
> >
> >     Does it require to sync on refQueue as well?

I don't think so.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 288
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line288>
> >
> >     It may be useful to return #.purged items?

What is the use?


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala, line 67
> > <https://reviews.apache.org/r/31568/diff/1/?file=881362#file881362line67>
> >
> >     latch.await(0, TimeUnit.SECONDS)?

This is to avoid a race condition. In Timer, tasks are run by a thread pool. "0" makes it more vulnerable. "3" makes it pretty safe.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 245
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line245>
> >
> >     It seems we do not need to keep this as a class member variable, but just compute the value in purge() on-the-fly every time.

This is a shared counter updated at multiple places. We need to have this to avoid unnecessary purge calls.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
> > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72>
> >
> >     It seems the task entry of the task will only be set once throughout its life time; even when the task entry gets reinsurted its correspondence to the task will not change, right?
> >     
> >     If that is true we can just set the entry for the task in the constructor of the task entry.

This sets TimerTaskEntry to TimerTask. TimeTask is created independently from a Timer, then enqueued to a Timer.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala, lines 29-33
> > <https://reviews.apache.org/r/31568/diff/1/?file=881361#file881361line29>
> >
> >     Could we just add an atomic integer recording the list size and size() function to TimerTaskList?

We size the list only in this test. Adding a counter to an individual list is unnecessary overhead.


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.

> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 116
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line116>
> >
> >     We need to make tickMs and wheelSize configurable.
> 
> Yasuhiro Matsuda wrote:
>     What is the motivation? I don't think it is a good idea to allow users to configure them.
> 
> Guozhang Wang wrote:
>     I am not concerning about user-configurability. The purgatory is used by multiple request types: produce, fetch and in the future rebalance, heartbeat and join group, different request type may need to set the tickMs and wheelSize differently.

It is easy to add parameters to DelayedOperationPurgatory since Timer already has them. But I don't see any compelling reason to do it now. Hierarchical Timing wheel is very robust to varying timeout requests by design.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187>
> >
> >     TBD
> 
> Guozhang Wang wrote:
>     Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add return Boolean instead of Unit indicating if the task has not expired and successfully added to the timer. And then we can change above as
>     
>     if (!operation.isComplete()) {
>       if (!timeoutTimer.add(operation) {
>         operation.cancel()
>       }
>     }

An "expired" task is always successfuly added to the timer and executed immediately. Do you mean "completed" instead of "expired"? TimeTask do not have notion of "completion". I kept the "completion" concept out of the Timer implementation since it is not essential to Timer functionality.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
> > <https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68>
> >
> >     I think bucket.flush(reinsurt) will always fail on all the items since their expiration time will always < bucket expiration + ticketMs, i.e. the returned bucket from the delayed queue has already expired all its items. In this case, could we just call foreach(submit) on all of them instead of trying to reinsurt them?
> 
> Yasuhiro Matsuda wrote:
>     It is true only for the lowest wheel. Reinsert is necessary to make timing wheels work. A bucket from a higher wheel may contain tasks not expired (a tick time is longer in a higher wheel).
> 
> Guozhang Wang wrote:
>     OK, I may miss sth. here, but this is my reasoning:
>     
>     The bucket is only returned from delayed queue in line 62 if its expiration time has passed currentTime, after that at least the lowest wheel will advance to its expiration time, and hence add call within the reinsert is doomed to fail as task.expirationTime < wheel's time + tickMs.

If the expired bucket is from the lowest wheel, all tasks in the bucket is expired. "reinsert" submits the task to a thread pool for execution.
If the expired bucket is from a higher wheel, tasks are either expired or not expired. "reinsert" submits the expired tasks to a thread pool and move unexpired tasks to lower wheels.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
> > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72>
> >
> >     It seems the task entry of the task will only be set once throughout its life time; even when the task entry gets reinsurted its correspondence to the task will not change, right?
> >     
> >     If that is true we can just set the entry for the task in the constructor of the task entry.
> 
> Yasuhiro Matsuda wrote:
>     This sets TimerTaskEntry to TimerTask. TimeTask is created independently from a Timer, then enqueued to a Timer.
> 
> Guozhang Wang wrote:
>     Yes, but can we move this line to line 119 of TimerTaskList.scala? Then in line 46 of Timer when we create the TimerTaskEntry with the passed in TimerTask its entry field will be set automatically.

If a task already enqueued to a timer is enqueued again intentionally or unintentionally (=bug), what happens?
My intention here is to keep data structure consistent in such a case. setTimerTaskEntry removes the old entry if exists.


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Guozhang Wang <wa...@gmail.com>.

> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187>
> >
> >     TBD
> 
> Guozhang Wang wrote:
>     Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add return Boolean instead of Unit indicating if the task has not expired and successfully added to the timer. And then we can change above as
>     
>     if (!operation.isComplete()) {
>       if (!timeoutTimer.add(operation) {
>         operation.cancel()
>       }
>     }
> 
> Yasuhiro Matsuda wrote:
>     An "expired" task is always successfuly added to the timer and executed immediately. Do you mean "completed" instead of "expired"? TimeTask do not have notion of "completion". I kept the "completion" concept out of the Timer implementation since it is not essential to Timer functionality.

That makes sense. I guess I was just trying to avoid calling operation.isComplete consecutively, which may actually not be a bad thing.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
> > <https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68>
> >
> >     I think bucket.flush(reinsurt) will always fail on all the items since their expiration time will always < bucket expiration + ticketMs, i.e. the returned bucket from the delayed queue has already expired all its items. In this case, could we just call foreach(submit) on all of them instead of trying to reinsurt them?
> 
> Yasuhiro Matsuda wrote:
>     It is true only for the lowest wheel. Reinsert is necessary to make timing wheels work. A bucket from a higher wheel may contain tasks not expired (a tick time is longer in a higher wheel).
> 
> Guozhang Wang wrote:
>     OK, I may miss sth. here, but this is my reasoning:
>     
>     The bucket is only returned from delayed queue in line 62 if its expiration time has passed currentTime, after that at least the lowest wheel will advance to its expiration time, and hence add call within the reinsert is doomed to fail as task.expirationTime < wheel's time + tickMs.
> 
> Yasuhiro Matsuda wrote:
>     If the expired bucket is from the lowest wheel, all tasks in the bucket is expired. "reinsert" submits the task to a thread pool for execution.
>     If the expired bucket is from a higher wheel, tasks are either expired or not expired. "reinsert" submits the expired tasks to a thread pool and move unexpired tasks to lower wheels.

Got it.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
> > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72>
> >
> >     It seems the task entry of the task will only be set once throughout its life time; even when the task entry gets reinsurted its correspondence to the task will not change, right?
> >     
> >     If that is true we can just set the entry for the task in the constructor of the task entry.
> 
> Yasuhiro Matsuda wrote:
>     This sets TimerTaskEntry to TimerTask. TimeTask is created independently from a Timer, then enqueued to a Timer.
> 
> Guozhang Wang wrote:
>     Yes, but can we move this line to line 119 of TimerTaskList.scala? Then in line 46 of Timer when we create the TimerTaskEntry with the passed in TimerTask its entry field will be set automatically.
> 
> Yasuhiro Matsuda wrote:
>     If a task already enqueued to a timer is enqueued again intentionally or unintentionally (=bug), what happens?
>     My intention here is to keep data structure consistent in such a case. setTimerTaskEntry removes the old entry if exists.

This is true, I was originally confused about whether we ever need to re-enqueue, but the previous comment made it clear to me now.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Guozhang Wang <wa...@gmail.com>.

> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 116
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line116>
> >
> >     We need to make tickMs and wheelSize configurable.
> 
> Yasuhiro Matsuda wrote:
>     What is the motivation? I don't think it is a good idea to allow users to configure them.

I am not concerning about user-configurability. The purgatory is used by multiple request types: produce, fetch and in the future rebalance, heartbeat and join group, different request type may need to set the tickMs and wheelSize differently.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187>
> >
> >     TBD

Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add return Boolean instead of Unit indicating if the task has not expired and successfully added to the timer. And then we can change above as

if (!operation.isComplete()) {
  if (!timeoutTimer.add(operation) {
    operation.cancel()
  }
}


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 288
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line288>
> >
> >     It may be useful to return #.purged items?
> 
> Yasuhiro Matsuda wrote:
>     What is the use?

At line 316 / 317 we could log on trace level whether the clock advance expired any tasks and the #.purged items.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
> > <https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68>
> >
> >     I think bucket.flush(reinsurt) will always fail on all the items since their expiration time will always < bucket expiration + ticketMs, i.e. the returned bucket from the delayed queue has already expired all its items. In this case, could we just call foreach(submit) on all of them instead of trying to reinsurt them?
> 
> Yasuhiro Matsuda wrote:
>     It is true only for the lowest wheel. Reinsert is necessary to make timing wheels work. A bucket from a higher wheel may contain tasks not expired (a tick time is longer in a higher wheel).

OK, I may miss sth. here, but this is my reasoning:

The bucket is only returned from delayed queue in line 62 if its expiration time has passed currentTime, after that at least the lowest wheel will advance to its expiration time, and hence add call within the reinsert is doomed to fail as task.expirationTime < wheel's time + tickMs.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
> > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72>
> >
> >     It seems the task entry of the task will only be set once throughout its life time; even when the task entry gets reinsurted its correspondence to the task will not change, right?
> >     
> >     If that is true we can just set the entry for the task in the constructor of the task entry.
> 
> Yasuhiro Matsuda wrote:
>     This sets TimerTaskEntry to TimerTask. TimeTask is created independently from a Timer, then enqueued to a Timer.

Yes, but can we move this line to line 119 of TimerTaskList.scala? Then in line 46 of Timer when we create the TimerTaskEntry with the passed in TimerTask its entry field will be set automatically.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Sriharsha Chintalapani <ha...@hortonworks.com>.

> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > We can probably remove DelayedItem if it is not referenced by anyone any more.

I am using DelayedItem for KAFKA-1461.


- Sriharsha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Yasuhiro Matsuda <ya...@gmail.com>.

> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
> > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72>
> >
> >     It seems the task entry of the task will only be set once throughout its life time; even when the task entry gets reinsurted its correspondence to the task will not change, right?
> >     
> >     If that is true we can just set the entry for the task in the constructor of the task entry.
> 
> Yasuhiro Matsuda wrote:
>     This sets TimerTaskEntry to TimerTask. TimeTask is created independently from a Timer, then enqueued to a Timer.
> 
> Guozhang Wang wrote:
>     Yes, but can we move this line to line 119 of TimerTaskList.scala? Then in line 46 of Timer when we create the TimerTaskEntry with the passed in TimerTask its entry field will be set automatically.
> 
> Yasuhiro Matsuda wrote:
>     If a task already enqueued to a timer is enqueued again intentionally or unintentionally (=bug), what happens?
>     My intention here is to keep data structure consistent in such a case. setTimerTaskEntry removes the old entry if exists.
> 
> Guozhang Wang wrote:
>     This is true, I was originally confused about whether we ever need to re-enqueue, but the previous comment made it clear to me now.

On second though, I think you are right. We need to call setTimerTaskEntry only in TimerTaskEntry constructor. It will handle re-enqueue cases and also cleaner. Calling it from TimerTaskList.add() was a bad idea. It can cuase a deadlock. :(


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>


Re: Review Request 31568: Patch for KAFKA-1989

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


We can probably remove DelayedItem if it is not referenced by anyone any more.


core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment123994>

    Add "override" keyword to indicate it is extended from the Runnable. Also, the comments on top seems be referring to a variable, not a function, which is a bit misleading.



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124005>

    We need to make tickMs and wheelSize configurable.



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124007>

    TBD



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124136>

    It seems we do not need to keep this as a class member variable, but just compute the value in purge() on-the-fly every time.



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124140>

    Does it require to sync on refQueue as well?



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124134>

    It may be useful to return #.purged items?



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124135>

    The threshold should be configurable.



core/src/main/scala/kafka/utils/timer/Timer.scala
<https://reviews.apache.org/r/31568/#comment124361>

    I think bucket.flush(reinsurt) will always fail on all the items since their expiration time will always < bucket expiration + ticketMs, i.e. the returned bucket from the delayed queue has already expired all its items. In this case, could we just call foreach(submit) on all of them instead of trying to reinsurt them?



core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
<https://reviews.apache.org/r/31568/#comment124401>

    It seems the task entry of the task will only be set once throughout its life time; even when the task entry gets reinsurted its correspondence to the task will not change, right?
    
    If that is true we can just set the entry for the task in the constructor of the task entry.



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment124359>

    How about change this comment to:
    
    We only need to enqueue the bucket when its expieration time has changed, i.e. the wheel has advanced one cycle and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle will pass in the same value and hence return false, thus the bucket with the same expiration will not be enqueued multiple times.



core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
<https://reviews.apache.org/r/31568/#comment124462>

    Could we just add an atomic integer recording the list size and size() function to TimerTaskList?



core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
<https://reviews.apache.org/r/31568/#comment124459>

    latch.await(0, TimeUnit.SECONDS)?


Since this a rather complicated patch (even after reading the wiki page I took quite some time to get through the code), I would suggest adding more comments on the functions / member variables of each classes.

- Guozhang Wang


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>