You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jun Rao <ju...@gmail.com> on 2013/12/04 19:13:18 UTC

Re: Review Request 15674: new patch

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Dec. 4, 2013, 6:13 p.m.)


Review request for kafka.


Summary (updated)
-----------------

new patch


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
  core/src/main/scala/kafka/common/AllDoneException.scala PRE-CREATION 
  core/src/main/scala/kafka/common/CancelledCleaningException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: address review comments

Posted by Jun Rao <ju...@gmail.com>.

> On Dec. 10, 2013, 1:27 a.m., Jay Kreps wrote:
> > core/src/main/scala/kafka/log/LogManager.scala, line 366
> > <https://reviews.apache.org/r/15674/diff/2/?file=393912#file393912line366>
> >
> >     Another alternative would just be to let the background flush fail and log it rather than shutting down--it is after all a background flush.

Yes, this seems simpler.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review30067
-----------------------------------------------------------


On Dec. 16, 2013, 5:43 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Dec. 16, 2013, 5:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Summary of changes since the previous patch.
> 
> LogCleaner:
> * Added the logic to pause and resume the cleaning of a log. Canceling the cleaning is implemented as pausing, followed by resuming.
> 
> LogManager:
> * Don't halt when background flush hits an IOException since the same IOException will be hit during log append, which will halt the broker. This removes the need to synchronize btw the flushing and the deleting of the log.
> * Removed OptimisticLockFailureException in LogCleaner. When a log needs to be truncated, first pause log cleaning, then truncate the log, and finally resume log cleaning.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/PausingCleaningException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: new patch

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review30067
-----------------------------------------------------------



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57604>

    Maybe this could be included with inProgress?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57606>

    Can we rename this cond to something like cleaningsChangedCond or something that describes what condition.



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57600>

    Can you include the param name here.



core/src/main/scala/kafka/log/LogManager.scala
<https://reviews.apache.org/r/15674/#comment57599>

    Another alternative would just be to let the background flush fail and log it rather than shutting down--it is after all a background flush. 



core/src/main/scala/kafka/log/LogManager.scala
<https://reviews.apache.org/r/15674/#comment57597>

    Maybe this could be simplified into just a lock?
    
    


- Jay Kreps


On Dec. 4, 2013, 6:15 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Dec. 4, 2013, 6:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Summary of the new patch:
> LogCleaner:
> *added the logic to cancel an in-progress cleaning task
> *made the cleaner thread a shutdownable, but not interruptible thread. Interrupting the cleaner may cause the file channel to be closed, which will fail other operations like log flushing during shutdown.
> 
> LogManager:
> * added the logic to wait until an in-progress flushing process complete
> * to delete a log do
>   (1) take the log to be deleted from log lists so that log cleaner and log flusher won't see it in the future
>   (2) cancel any in-progress cleaning task
>   (3) wait until any in-progress flushing process to complete
>   (4) at this moment, we know the log won't be touched by the cleaner or the flusher any more and we can delete the whole directory synchronously.
> 
> Todos:
> * If the overall logic looks good, we can get rid of OptimisticLockFailureException in LogCleaner too by canceling any cleaner task before truncating the log. This can be done in a follow up jira.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/common/AllDoneException.scala PRE-CREATION 
>   core/src/main/scala/kafka/common/CancelledCleaningException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: address review comments

Posted by Jun Rao <ju...@gmail.com>.

> On Dec. 5, 2013, 1:54 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 205
> > <https://reviews.apache.org/r/15674/diff/2/?file=393911#file393911line205>
> >
> >     Are we going to have other checkDone implementations for this case? If not we could just save passing it as a parameter?

CheckDone() has to be defined in CleanerThread. Since Cleaner is not an inner class of CleanerThread, it won't have access to CheckDone unless it's passed in.


> On Dec. 5, 2013, 1:54 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 209
> > <https://reviews.apache.org/r/15674/diff/2/?file=393911#file393911line209>
> >
> >     Where should this exception be captured?

We don't need to catch this exception explicitly. Any exception will kill a ShutdownableThread. In ShutdownableThread.run(), we eat any exception if the thread is shut down normally.


> On Dec. 5, 2013, 1:54 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 524
> > <https://reviews.apache.org/r/15674/diff/2/?file=393911#file393911line524>
> >
> >     Do we have to keep two buildOffsetMap functions here?

The inner buildOffsetMap() is per segment. So, we can probably rename it to buildOffsetMapForSegment().


> On Dec. 5, 2013, 1:54 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/log/LogManager.scala, line 278
> > <https://reviews.apache.org/r/15674/diff/2/?file=393912#file393912line278>
> >
> >     Is there still a risk that after waitUntilInProgressFlusherIsDone passed another flushing procedure starts?

No, because the next flusher will not see the deleted log since it's already removed from the log list.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review29783
-----------------------------------------------------------


On Dec. 16, 2013, 5:41 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Dec. 16, 2013, 5:41 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/PausingCleaningException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: new patch

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review29783
-----------------------------------------------------------



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57268>

    Are we going to have other checkDone implementations for this case? If not we could just save passing it as a parameter?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57267>

    Where should this exception be captured?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57266>

    Do we have to keep two buildOffsetMap functions here?



core/src/main/scala/kafka/log/LogManager.scala
<https://reviews.apache.org/r/15674/#comment57269>

    Is there still a risk that after waitUntilInProgressFlusherIsDone passed another flushing procedure starts?


- Guozhang Wang


On Dec. 4, 2013, 6:15 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Dec. 4, 2013, 6:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Summary of the new patch:
> LogCleaner:
> *added the logic to cancel an in-progress cleaning task
> *made the cleaner thread a shutdownable, but not interruptible thread. Interrupting the cleaner may cause the file channel to be closed, which will fail other operations like log flushing during shutdown.
> 
> LogManager:
> * added the logic to wait until an in-progress flushing process complete
> * to delete a log do
>   (1) take the log to be deleted from log lists so that log cleaner and log flusher won't see it in the future
>   (2) cancel any in-progress cleaning task
>   (3) wait until any in-progress flushing process to complete
>   (4) at this moment, we know the log won't be touched by the cleaner or the flusher any more and we can delete the whole directory synchronously.
> 
> Todos:
> * If the overall logic looks good, we can get rid of OptimisticLockFailureException in LogCleaner too by canceling any cleaner task before truncating the log. This can be done in a follow up jira.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/common/AllDoneException.scala PRE-CREATION 
>   core/src/main/scala/kafka/common/CancelledCleaningException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: address review comments

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review30456
-----------------------------------------------------------



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/15674/#comment58264>

    I'm guessing this change shouldn't be in this patch



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/15674/#comment58265>

    same here



core/src/main/scala/kafka/common/AllDoneException.scala
<https://reviews.apache.org/r/15674/#comment58266>

    can we please rename AllDone to something meaningful?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment58268>

    Can we rename inProgress to cleaningInProgress?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment58269>

    I think these case objects should be named more descriptively to communicate that they are related to log cleaning. For example, InProgress -> LogCleaningInProgress. Makes the code much more readable. Similarly PausingLogCleaning and LogCleaningPaused



core/src/main/scala/kafka/common/PausingCleaningException.scala
<https://reviews.apache.org/r/15674/#comment58267>

    PausingCleaningException => PausedCleaningException on the lines of CancelledCleaningException


- Neha Narkhede


On Dec. 16, 2013, 5:43 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Dec. 16, 2013, 5:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Summary of changes since the previous patch.
> 
> LogCleaner:
> * Added the logic to pause and resume the cleaning of a log. Canceling the cleaning is implemented as pausing, followed by resuming.
> 
> LogManager:
> * Don't halt when background flush hits an IOException since the same IOException will be hit during log append, which will halt the broker. This removes the need to synchronize btw the flushing and the deleting of the log.
> * Removed OptimisticLockFailureException in LogCleaner. When a log needs to be truncated, first pause log cleaning, then truncate the log, and finally resume log cleaning.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/PausingCleaningException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: add unit test for aborting during cleaning

Posted by Jun Rao <ju...@gmail.com>.

> On Jan. 3, 2014, 4:22 p.m., Jay Kreps wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 262
> > <https://reviews.apache.org/r/15674/diff/4/?file=413443#file413443line262>
> >
> >     Would these be better as just InProgress, Aborted, and Paused?

My feeling is that in the future there could be similar states in other components such as the controller. So, having a longer name will make it clear which state it really is.


> On Jan. 3, 2014, 4:22 p.m., Jay Kreps wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 298
> > <https://reviews.apache.org/r/15674/diff/4/?file=413443#file413443line298>
> >
> >     It seems like the logic for chosing logs is not part of the state management, is there any way to seperate this out?

That's a bit hard. grabFilthestLog() accesses checkpoints and inProgress, both of which need to be protected by the lock.


> On Jan. 3, 2014, 4:22 p.m., Jay Kreps wrote:
> > core/src/test/scala/unit/kafka/log/CleanerTest.scala, line 98
> > <https://reviews.apache.org/r/15674/diff/4/?file=413448#file413448line98>
> >
> >     Do we have coverage for cleaning + truncation simultaneously?

Added a unit test for aborting during log cleaning.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31134
-----------------------------------------------------------


On Jan. 3, 2014, 8:27 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 3, 2014, 8:27 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> add test for aborting during cleaning
> 
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: address more review comments

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31134
-----------------------------------------------------------



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment59482>

    Would these be better as just InProgress, Aborted, and Paused?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment59483>

    It seems like the logic for chosing logs is not part of the state management, is there any way to seperate this out?



core/src/test/scala/unit/kafka/log/CleanerTest.scala
<https://reviews.apache.org/r/15674/#comment59481>

    Do we have coverage for cleaning + truncation simultaneously?


- Jay Kreps


On Jan. 2, 2014, 4:33 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 2, 2014, 4:33 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: move LogCleanerManager to a separate file and improve unit test

Posted by Jun Rao <ju...@gmail.com>.

> On Jan. 6, 2014, 9:18 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 161
> > <https://reviews.apache.org/r/15674/diff/6/?file=415674#file415674line161>
> >
> >     the method description says it should block until the cleaner has processed upto the given offset. But it doesn't look like the offset parameter is used in any way?

This is an existing problem. Will file a separate jira to have it addressed.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31247
-----------------------------------------------------------


On Jan. 7, 2014, 6:51 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 7, 2014, 6:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> move LogCleanerManager to a separate file and improve unit test
> 
> 
> rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment
> 
> 
> add test for aborting during cleaning
> 
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/LogCleaningAbortedException.scala PRE-CREATION 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31247
-----------------------------------------------------------


Overall, looks good. Few questions and minor comments


core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment59648>

    the method description says it should block until the cleaner has processed upto the given offset. But it doesn't look like the offset parameter is used in any way?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment59650>

    it is worth placing the LogCleanerManager and the Cleaner in separate class files. Makes the code easier to read



core/src/test/scala/unit/kafka/admin/AdminTest.scala
<https://reviews.apache.org/r/15674/#comment59646>

    This check is good. In addition to this, let's also validate that the only brokers that have the partition directory are the brokers in the reassigned replica list. 


- Neha Narkhede


On Jan. 4, 2014, 4:37 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 4, 2014, 4:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment
> 
> 
> add test for aborting during cleaning
> 
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: move LogCleanerManager to a separate file and improve unit test

Posted by Jun Rao <ju...@gmail.com>.

> On Jan. 7, 2014, 11:42 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/common/OptimisticLockFailureException.scala, line 23
> > <https://reviews.apache.org/r/15674/diff/6-7/?file=415671#file415671line23>
> >
> >     Is this change intended?

This is a diff btw v6 and v7, but is not in v7 itself.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31322
-----------------------------------------------------------


On Jan. 7, 2014, 6:51 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 7, 2014, 6:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> move LogCleanerManager to a separate file and improve unit test
> 
> 
> rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment
> 
> 
> add test for aborting during cleaning
> 
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/LogCleaningAbortedException.scala PRE-CREATION 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: move LogCleanerManager to a separate file and improve unit test

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31322
-----------------------------------------------------------

Ship it!


Other than the question below, I think this patch looks good.


core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
<https://reviews.apache.org/r/15674/#comment59786>

    Is this change intended?


- Neha Narkhede


On Jan. 7, 2014, 6:51 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 7, 2014, 6:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> move LogCleanerManager to a separate file and improve unit test
> 
> 
> rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment
> 
> 
> add test for aborting during cleaning
> 
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/LogCleaningAbortedException.scala PRE-CREATION 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: move LogCleanerManager to a separate file and improve unit test

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Jan. 7, 2014, 6:51 p.m.)


Review request for kafka.


Summary (updated)
-----------------

move LogCleanerManager to a separate file and improve unit test


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

move LogCleanerManager to a separate file and improve unit test


rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment


add test for aborting during cleaning


checkpoint recovery points in truncateFullyAndStartAt()


move cleaning states and locks to a separate class


minor fix 2


minor fix


remove unused var and exception


support pause/resume log clean and remove OptimisticLockFailureException


kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/LogCleaningAbortedException.scala PRE-CREATION 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogCleanerManager.scala PRE-CREATION 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Jan. 4, 2014, 4:37 p.m.)


Review request for kafka.


Summary (updated)
-----------------

rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment


add test for aborting during cleaning


checkpoint recovery points in truncateFullyAndStartAt()


move cleaning states and locks to a separate class


minor fix 2


minor fix


remove unused var and exception


support pause/resume log clean and remove OptimisticLockFailureException


kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: add unit test for aborting during cleaning

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Jan. 3, 2014, 8:27 p.m.)


Review request for kafka.


Summary (updated)
-----------------

add unit test for aborting during cleaning


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

add test for aborting during cleaning


checkpoint recovery points in truncateFullyAndStartAt()


move cleaning states and locks to a separate class


minor fix 2


minor fix


remove unused var and exception


support pause/resume log clean and remove OptimisticLockFailureException


kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: address more review comments

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Jan. 2, 2014, 4:33 p.m.)


Review request for kafka.


Summary (updated)
-----------------

address more review comments


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

checkpoint recovery points in truncateFullyAndStartAt()


move cleaning states and locks to a separate class


minor fix 2


minor fix


remove unused var and exception


support pause/resume log clean and remove OptimisticLockFailureException


kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: address review comments

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Dec. 16, 2013, 5:43 p.m.)


Review request for kafka.


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

Summary of changes since the previous patch.

LogCleaner:
* Added the logic to pause and resume the cleaning of a log. Canceling the cleaning is implemented as pausing, followed by resuming.

LogManager:
* Don't halt when background flush hits an IOException since the same IOException will be hit during log append, which will halt the broker. This removes the need to synchronize btw the flushing and the deleting of the log.
* Removed OptimisticLockFailureException in LogCleaner. When a log needs to be truncated, first pause log cleaning, then truncate the log, and finally resume log cleaning.


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/common/PausingCleaningException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: address review comments

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Dec. 16, 2013, 5:41 p.m.)


Review request for kafka.


Summary (updated)
-----------------

address review comments


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

minor fix 2


minor fix


remove unused var and exception


support pause/resume log clean and remove OptimisticLockFailureException


kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/common/PausingCleaningException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: new patch

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Dec. 4, 2013, 6:15 p.m.)


Review request for kafka.


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

Summary of the new patch:
LogCleaner:
*added the logic to cancel an in-progress cleaning task
*made the cleaner thread a shutdownable, but not interruptible thread. Interrupting the cleaner may cause the file channel to be closed, which will fail other operations like log flushing during shutdown.

LogManager:
* added the logic to wait until an in-progress flushing process complete
* to delete a log do
  (1) take the log to be deleted from log lists so that log cleaner and log flusher won't see it in the future
  (2) cancel any in-progress cleaning task
  (3) wait until any in-progress flushing process to complete
  (4) at this moment, we know the log won't be touched by the cleaner or the flusher any more and we can delete the whole directory synchronously.

Todos:
* If the overall logic looks good, we can get rid of OptimisticLockFailureException in LogCleaner too by canceling any cleaner task before truncating the log. This can be done in a follow up jira.


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
  core/src/main/scala/kafka/common/AllDoneException.scala PRE-CREATION 
  core/src/main/scala/kafka/common/CancelledCleaningException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao