You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jun Rao <ju...@gmail.com> on 2013/11/19 17:28:58 UTC

Review Request 15674: Reassign partitions should delete the old replicas from disk

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description
-------

kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
  core/src/main/scala/kafka/log/Log.scala 1883a53de112ad08449dc73a2ca08208c11a2537 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/ReplicaManager.scala 161f58134f20f9335dbd2bee6ac3f71897cbef7c 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: Reassign partitions should delete the old replicas from disk

Posted by Neha Narkhede <ne...@gmail.com>.

> On Nov. 19, 2013, 6:40 p.m., Jay Kreps wrote:
> > What happens if I am doing a read or write concurrently with a delete?
> > 
> > Would it be simpler just to have the delete log work like the segment delete where rather than trying to lock we remove it from the segment list and then just enqueue a delete in 60 seconds. My concern is just that reasoning about the various locking strategies in the log is getting increasingly difficult.
> 
> Jun Rao wrote:
>     Yes, we could try deleting the log asynchronously. The issues there are:
>     
>     1. The same partition could be moved back to this broker during the delayed window.
>     2. It's not clear if 60 secs (or any value) is good enough since the time that an ongoing scheduled flush takes is unbounded.
>     
>     The following is how this patch handles outstanding reads/writes on the deleted data.
>     
>     1. All read operations are ok since we already handle unexpected exceptions in KafkaApi. The caller will get an error.
>     2. Currently, if we hit an IOException while writing to the log by the producer request, the replica fetcher or the log flusher, we halt the broker. We need to make sure that the deletion of a log doesn't cause the halt. This is achieved by preventing those operations on the log once it's deleted.
>     2.1 For producer requests, the delete partition operation will synchronize on the leaderAndIsrUpdate lock.
>     2.2 For replica fetcher, this is already handled since the fetcher is removed before the log is deleted.
>     2.3 For log flusher, the flush and the delete will now synchronize on a delete lock.
>     
>     I agree that this approach uses more locks, which potentially makes the code harder to understand. However, my feeling is that this is probably a less hacky approach than the async delete one.

At least until the various locks are cleaned up, the current approach used in the patch seems safer compared to an async delete. Will take a closer look at the patch sometime today.


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review29123
-----------------------------------------------------------


On Nov. 19, 2013, 4:28 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Nov. 19, 2013, 4:28 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/log/Log.scala 1883a53de112ad08449dc73a2ca08208c11a2537 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 161f58134f20f9335dbd2bee6ac3f71897cbef7c 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: Reassign partitions should delete the old replicas from disk

Posted by Jun Rao <ju...@gmail.com>.

> On Nov. 19, 2013, 6:40 p.m., Jay Kreps wrote:
> > What happens if I am doing a read or write concurrently with a delete?
> > 
> > Would it be simpler just to have the delete log work like the segment delete where rather than trying to lock we remove it from the segment list and then just enqueue a delete in 60 seconds. My concern is just that reasoning about the various locking strategies in the log is getting increasingly difficult.

Yes, we could try deleting the log asynchronously. The issues there are:

1. The same partition could be moved back to this broker during the delayed window.
2. It's not clear if 60 secs (or any value) is good enough since the time that an ongoing scheduled flush takes is unbounded.

The following is how this patch handles outstanding reads/writes on the deleted data.

1. All read operations are ok since we already handle unexpected exceptions in KafkaApi. The caller will get an error.
2. Currently, if we hit an IOException while writing to the log by the producer request, the replica fetcher or the log flusher, we halt the broker. We need to make sure that the deletion of a log doesn't cause the halt. This is achieved by preventing those operations on the log once it's deleted.
2.1 For producer requests, the delete partition operation will synchronize on the leaderAndIsrUpdate lock.
2.2 For replica fetcher, this is already handled since the fetcher is removed before the log is deleted.
2.3 For log flusher, the flush and the delete will now synchronize on a delete lock.

I agree that this approach uses more locks, which potentially makes the code harder to understand. However, my feeling is that this is probably a less hacky approach than the async delete one.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review29123
-----------------------------------------------------------


On Nov. 19, 2013, 4:28 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Nov. 19, 2013, 4:28 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/log/Log.scala 1883a53de112ad08449dc73a2ca08208c11a2537 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 161f58134f20f9335dbd2bee6ac3f71897cbef7c 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: Reassign partitions should delete the old replicas from disk

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review29123
-----------------------------------------------------------


What happens if I am doing a read or write concurrently with a delete?

Would it be simpler just to have the delete log work like the segment delete where rather than trying to lock we remove it from the segment list and then just enqueue a delete in 60 seconds. My concern is just that reasoning about the various locking strategies in the log is getting increasingly difficult.

- Jay Kreps


On Nov. 19, 2013, 4:28 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Nov. 19, 2013, 4:28 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/log/Log.scala 1883a53de112ad08449dc73a2ca08208c11a2537 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 161f58134f20f9335dbd2bee6ac3f71897cbef7c 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: Reassign partitions should delete the old replicas from disk

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review29118
-----------------------------------------------------------



core/src/main/scala/kafka/log/LogSegment.scala
<https://reviews.apache.org/r/15674/#comment56244>

    Probably we could throw the exception if log.delete failed so we still have the index for trouble shooting?


Currently we have the the locking hierarchy as:

ReplicaManager : replicaStateChangeLock
|
Partition : leaderIsrUpdateLock
|
LogManager : logCreationOrDeletionLock
|
Log : this.lock
|
LogSegment : this.lock

I think probably we can reduce some of the lower level synchronization if they have already been covered by the higher level ones?

- Guozhang Wang


On Nov. 19, 2013, 4:28 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Nov. 19, 2013, 4:28 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/log/Log.scala 1883a53de112ad08449dc73a2ca08208c11a2537 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 161f58134f20f9335dbd2bee6ac3f71897cbef7c 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: address review comments

Posted by Jun Rao <ju...@gmail.com>.

> On Dec. 10, 2013, 1:27 a.m., Jay Kreps wrote:
> > core/src/main/scala/kafka/log/LogManager.scala, line 366
> > <https://reviews.apache.org/r/15674/diff/2/?file=393912#file393912line366>
> >
> >     Another alternative would just be to let the background flush fail and log it rather than shutting down--it is after all a background flush.

Yes, this seems simpler.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review30067
-----------------------------------------------------------


On Dec. 16, 2013, 5:43 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Dec. 16, 2013, 5:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Summary of changes since the previous patch.
> 
> LogCleaner:
> * Added the logic to pause and resume the cleaning of a log. Canceling the cleaning is implemented as pausing, followed by resuming.
> 
> LogManager:
> * Don't halt when background flush hits an IOException since the same IOException will be hit during log append, which will halt the broker. This removes the need to synchronize btw the flushing and the deleting of the log.
> * Removed OptimisticLockFailureException in LogCleaner. When a log needs to be truncated, first pause log cleaning, then truncate the log, and finally resume log cleaning.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/PausingCleaningException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: new patch

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review30067
-----------------------------------------------------------



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57604>

    Maybe this could be included with inProgress?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57606>

    Can we rename this cond to something like cleaningsChangedCond or something that describes what condition.



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57600>

    Can you include the param name here.



core/src/main/scala/kafka/log/LogManager.scala
<https://reviews.apache.org/r/15674/#comment57599>

    Another alternative would just be to let the background flush fail and log it rather than shutting down--it is after all a background flush. 



core/src/main/scala/kafka/log/LogManager.scala
<https://reviews.apache.org/r/15674/#comment57597>

    Maybe this could be simplified into just a lock?
    
    


- Jay Kreps


On Dec. 4, 2013, 6:15 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Dec. 4, 2013, 6:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Summary of the new patch:
> LogCleaner:
> *added the logic to cancel an in-progress cleaning task
> *made the cleaner thread a shutdownable, but not interruptible thread. Interrupting the cleaner may cause the file channel to be closed, which will fail other operations like log flushing during shutdown.
> 
> LogManager:
> * added the logic to wait until an in-progress flushing process complete
> * to delete a log do
>   (1) take the log to be deleted from log lists so that log cleaner and log flusher won't see it in the future
>   (2) cancel any in-progress cleaning task
>   (3) wait until any in-progress flushing process to complete
>   (4) at this moment, we know the log won't be touched by the cleaner or the flusher any more and we can delete the whole directory synchronously.
> 
> Todos:
> * If the overall logic looks good, we can get rid of OptimisticLockFailureException in LogCleaner too by canceling any cleaner task before truncating the log. This can be done in a follow up jira.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/common/AllDoneException.scala PRE-CREATION 
>   core/src/main/scala/kafka/common/CancelledCleaningException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: address review comments

Posted by Jun Rao <ju...@gmail.com>.

> On Dec. 5, 2013, 1:54 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 205
> > <https://reviews.apache.org/r/15674/diff/2/?file=393911#file393911line205>
> >
> >     Are we going to have other checkDone implementations for this case? If not we could just save passing it as a parameter?

CheckDone() has to be defined in CleanerThread. Since Cleaner is not an inner class of CleanerThread, it won't have access to CheckDone unless it's passed in.


> On Dec. 5, 2013, 1:54 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 209
> > <https://reviews.apache.org/r/15674/diff/2/?file=393911#file393911line209>
> >
> >     Where should this exception be captured?

We don't need to catch this exception explicitly. Any exception will kill a ShutdownableThread. In ShutdownableThread.run(), we eat any exception if the thread is shut down normally.


> On Dec. 5, 2013, 1:54 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 524
> > <https://reviews.apache.org/r/15674/diff/2/?file=393911#file393911line524>
> >
> >     Do we have to keep two buildOffsetMap functions here?

The inner buildOffsetMap() is per segment. So, we can probably rename it to buildOffsetMapForSegment().


> On Dec. 5, 2013, 1:54 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/log/LogManager.scala, line 278
> > <https://reviews.apache.org/r/15674/diff/2/?file=393912#file393912line278>
> >
> >     Is there still a risk that after waitUntilInProgressFlusherIsDone passed another flushing procedure starts?

No, because the next flusher will not see the deleted log since it's already removed from the log list.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review29783
-----------------------------------------------------------


On Dec. 16, 2013, 5:41 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Dec. 16, 2013, 5:41 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/PausingCleaningException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: new patch

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review29783
-----------------------------------------------------------



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57268>

    Are we going to have other checkDone implementations for this case? If not we could just save passing it as a parameter?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57267>

    Where should this exception be captured?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment57266>

    Do we have to keep two buildOffsetMap functions here?



core/src/main/scala/kafka/log/LogManager.scala
<https://reviews.apache.org/r/15674/#comment57269>

    Is there still a risk that after waitUntilInProgressFlusherIsDone passed another flushing procedure starts?


- Guozhang Wang


On Dec. 4, 2013, 6:15 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Dec. 4, 2013, 6:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Summary of the new patch:
> LogCleaner:
> *added the logic to cancel an in-progress cleaning task
> *made the cleaner thread a shutdownable, but not interruptible thread. Interrupting the cleaner may cause the file channel to be closed, which will fail other operations like log flushing during shutdown.
> 
> LogManager:
> * added the logic to wait until an in-progress flushing process complete
> * to delete a log do
>   (1) take the log to be deleted from log lists so that log cleaner and log flusher won't see it in the future
>   (2) cancel any in-progress cleaning task
>   (3) wait until any in-progress flushing process to complete
>   (4) at this moment, we know the log won't be touched by the cleaner or the flusher any more and we can delete the whole directory synchronously.
> 
> Todos:
> * If the overall logic looks good, we can get rid of OptimisticLockFailureException in LogCleaner too by canceling any cleaner task before truncating the log. This can be done in a follow up jira.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/common/AllDoneException.scala PRE-CREATION 
>   core/src/main/scala/kafka/common/CancelledCleaningException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: address review comments

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review30456
-----------------------------------------------------------



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/15674/#comment58264>

    I'm guessing this change shouldn't be in this patch



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/15674/#comment58265>

    same here



core/src/main/scala/kafka/common/AllDoneException.scala
<https://reviews.apache.org/r/15674/#comment58266>

    can we please rename AllDone to something meaningful?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment58268>

    Can we rename inProgress to cleaningInProgress?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment58269>

    I think these case objects should be named more descriptively to communicate that they are related to log cleaning. For example, InProgress -> LogCleaningInProgress. Makes the code much more readable. Similarly PausingLogCleaning and LogCleaningPaused



core/src/main/scala/kafka/common/PausingCleaningException.scala
<https://reviews.apache.org/r/15674/#comment58267>

    PausingCleaningException => PausedCleaningException on the lines of CancelledCleaningException


- Neha Narkhede


On Dec. 16, 2013, 5:43 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Dec. 16, 2013, 5:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Summary of changes since the previous patch.
> 
> LogCleaner:
> * Added the logic to pause and resume the cleaning of a log. Canceling the cleaning is implemented as pausing, followed by resuming.
> 
> LogManager:
> * Don't halt when background flush hits an IOException since the same IOException will be hit during log append, which will halt the broker. This removes the need to synchronize btw the flushing and the deleting of the log.
> * Removed OptimisticLockFailureException in LogCleaner. When a log needs to be truncated, first pause log cleaning, then truncate the log, and finally resume log cleaning.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/PausingCleaningException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: add unit test for aborting during cleaning

Posted by Jun Rao <ju...@gmail.com>.

> On Jan. 3, 2014, 4:22 p.m., Jay Kreps wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 262
> > <https://reviews.apache.org/r/15674/diff/4/?file=413443#file413443line262>
> >
> >     Would these be better as just InProgress, Aborted, and Paused?

My feeling is that in the future there could be similar states in other components such as the controller. So, having a longer name will make it clear which state it really is.


> On Jan. 3, 2014, 4:22 p.m., Jay Kreps wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 298
> > <https://reviews.apache.org/r/15674/diff/4/?file=413443#file413443line298>
> >
> >     It seems like the logic for chosing logs is not part of the state management, is there any way to seperate this out?

That's a bit hard. grabFilthestLog() accesses checkpoints and inProgress, both of which need to be protected by the lock.


> On Jan. 3, 2014, 4:22 p.m., Jay Kreps wrote:
> > core/src/test/scala/unit/kafka/log/CleanerTest.scala, line 98
> > <https://reviews.apache.org/r/15674/diff/4/?file=413448#file413448line98>
> >
> >     Do we have coverage for cleaning + truncation simultaneously?

Added a unit test for aborting during log cleaning.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31134
-----------------------------------------------------------


On Jan. 3, 2014, 8:27 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 3, 2014, 8:27 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> add test for aborting during cleaning
> 
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: address more review comments

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31134
-----------------------------------------------------------



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment59482>

    Would these be better as just InProgress, Aborted, and Paused?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment59483>

    It seems like the logic for chosing logs is not part of the state management, is there any way to seperate this out?



core/src/test/scala/unit/kafka/log/CleanerTest.scala
<https://reviews.apache.org/r/15674/#comment59481>

    Do we have coverage for cleaning + truncation simultaneously?


- Jay Kreps


On Jan. 2, 2014, 4:33 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 2, 2014, 4:33 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: move LogCleanerManager to a separate file and improve unit test

Posted by Jun Rao <ju...@gmail.com>.

> On Jan. 6, 2014, 9:18 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 161
> > <https://reviews.apache.org/r/15674/diff/6/?file=415674#file415674line161>
> >
> >     the method description says it should block until the cleaner has processed upto the given offset. But it doesn't look like the offset parameter is used in any way?

This is an existing problem. Will file a separate jira to have it addressed.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31247
-----------------------------------------------------------


On Jan. 7, 2014, 6:51 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 7, 2014, 6:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> move LogCleanerManager to a separate file and improve unit test
> 
> 
> rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment
> 
> 
> add test for aborting during cleaning
> 
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/LogCleaningAbortedException.scala PRE-CREATION 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31247
-----------------------------------------------------------


Overall, looks good. Few questions and minor comments


core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment59648>

    the method description says it should block until the cleaner has processed upto the given offset. But it doesn't look like the offset parameter is used in any way?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/15674/#comment59650>

    it is worth placing the LogCleanerManager and the Cleaner in separate class files. Makes the code easier to read



core/src/test/scala/unit/kafka/admin/AdminTest.scala
<https://reviews.apache.org/r/15674/#comment59646>

    This check is good. In addition to this, let's also validate that the only brokers that have the partition directory are the brokers in the reassigned replica list. 


- Neha Narkhede


On Jan. 4, 2014, 4:37 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 4, 2014, 4:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment
> 
> 
> add test for aborting during cleaning
> 
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: move LogCleanerManager to a separate file and improve unit test

Posted by Jun Rao <ju...@gmail.com>.

> On Jan. 7, 2014, 11:42 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/common/OptimisticLockFailureException.scala, line 23
> > <https://reviews.apache.org/r/15674/diff/6-7/?file=415671#file415671line23>
> >
> >     Is this change intended?

This is a diff btw v6 and v7, but is not in v7 itself.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31322
-----------------------------------------------------------


On Jan. 7, 2014, 6:51 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 7, 2014, 6:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> move LogCleanerManager to a separate file and improve unit test
> 
> 
> rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment
> 
> 
> add test for aborting during cleaning
> 
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/LogCleaningAbortedException.scala PRE-CREATION 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: move LogCleanerManager to a separate file and improve unit test

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/#review31322
-----------------------------------------------------------

Ship it!


Other than the question below, I think this patch looks good.


core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
<https://reviews.apache.org/r/15674/#comment59786>

    Is this change intended?


- Neha Narkhede


On Jan. 7, 2014, 6:51 p.m., Jun Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15674/
> -----------------------------------------------------------
> 
> (Updated Jan. 7, 2014, 6:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1074
>     https://issues.apache.org/jira/browse/KAFKA-1074
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> move LogCleanerManager to a separate file and improve unit test
> 
> 
> rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment
> 
> 
> add test for aborting during cleaning
> 
> 
> checkpoint recovery points in truncateFullyAndStartAt()
> 
> 
> move cleaning states and locks to a separate class
> 
> 
> minor fix 2
> 
> 
> minor fix
> 
> 
> remove unused var and exception
> 
> 
> support pause/resume log clean and remove OptimisticLockFailureException
> 
> 
> kafka-1074; minor fix3
> 
> 
> kafka-1074; minor fix2
> 
> 
> kafka-1074; minor fix
> 
> 
> kafka-1074; better synchronization with log cleaner
> 
> 
> kafka-1074; fix 4
> 
> 
> kafka-1074; fix 3
> 
> 
> kafka-1074; fix 2
> 
> 
> kafka-1074
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
>   core/src/main/scala/kafka/common/LogCleaningAbortedException.scala PRE-CREATION 
>   core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
>   core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
>   core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 
> 
> Diff: https://reviews.apache.org/r/15674/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jun Rao
> 
>


Re: Review Request 15674: move LogCleanerManager to a separate file and improve unit test

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Jan. 7, 2014, 6:51 p.m.)


Review request for kafka.


Summary (updated)
-----------------

move LogCleanerManager to a separate file and improve unit test


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

move LogCleanerManager to a separate file and improve unit test


rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment


add test for aborting during cleaning


checkpoint recovery points in truncateFullyAndStartAt()


move cleaning states and locks to a separate class


minor fix 2


minor fix


remove unused var and exception


support pause/resume log clean and remove OptimisticLockFailureException


kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/LogCleaningAbortedException.scala PRE-CREATION 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogCleanerManager.scala PRE-CREATION 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Jan. 4, 2014, 4:37 p.m.)


Review request for kafka.


Summary (updated)
-----------------

rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

rename LogCleanerStates to LogCleanerManagers; optimize truncateTo to only pause cleaning if the truncated offset is not on the active segment


add test for aborting during cleaning


checkpoint recovery points in truncateFullyAndStartAt()


move cleaning states and locks to a separate class


minor fix 2


minor fix


remove unused var and exception


support pause/resume log clean and remove OptimisticLockFailureException


kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: add unit test for aborting during cleaning

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Jan. 3, 2014, 8:27 p.m.)


Review request for kafka.


Summary (updated)
-----------------

add unit test for aborting during cleaning


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

add test for aborting during cleaning


checkpoint recovery points in truncateFullyAndStartAt()


move cleaning states and locks to a separate class


minor fix 2


minor fix


remove unused var and exception


support pause/resume log clean and remove OptimisticLockFailureException


kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: address more review comments

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Jan. 2, 2014, 4:33 p.m.)


Review request for kafka.


Summary (updated)
-----------------

address more review comments


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

checkpoint recovery points in truncateFullyAndStartAt()


move cleaning states and locks to a separate class


minor fix 2


minor fix


remove unused var and exception


support pause/resume log clean and remove OptimisticLockFailureException


kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/common/ThreadShutdownException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: address review comments

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Dec. 16, 2013, 5:43 p.m.)


Review request for kafka.


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

Summary of changes since the previous patch.

LogCleaner:
* Added the logic to pause and resume the cleaning of a log. Canceling the cleaning is implemented as pausing, followed by resuming.

LogManager:
* Don't halt when background flush hits an IOException since the same IOException will be hit during log append, which will halt the broker. This removes the need to synchronize btw the flushing and the deleting of the log.
* Removed OptimisticLockFailureException in LogCleaner. When a log needs to be truncated, first pause log cleaning, then truncate the log, and finally resume log cleaning.


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/common/PausingCleaningException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: address review comments

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Dec. 16, 2013, 5:41 p.m.)


Review request for kafka.


Summary (updated)
-----------------

address review comments


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

minor fix 2


minor fix


remove unused var and exception


support pause/resume log clean and remove OptimisticLockFailureException


kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 5c9307d71641ccc6c09a54b69d5aa2b4bc2a4cde 
  core/src/main/scala/kafka/common/OptimisticLockFailureException.scala 0e69110e71667ad75a460534f4422a7b6ec1cdc6 
  core/src/main/scala/kafka/common/PausingCleaningException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 715845b167c44268bd7e4b76dfb69199bfb2fad4 
  core/src/main/scala/kafka/server/ReplicaManager.scala 242c18d47828b7c5e6b1fc219a0f1199fb1f9512 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 52d35a3ccc86a66acb57c1138d4f2fea8f4ca4b0 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: new patch

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Dec. 4, 2013, 6:15 p.m.)


Review request for kafka.


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

Summary of the new patch:
LogCleaner:
*added the logic to cancel an in-progress cleaning task
*made the cleaner thread a shutdownable, but not interruptible thread. Interrupting the cleaner may cause the file channel to be closed, which will fail other operations like log flushing during shutdown.

LogManager:
* added the logic to wait until an in-progress flushing process complete
* to delete a log do
  (1) take the log to be deleted from log lists so that log cleaner and log flusher won't see it in the future
  (2) cancel any in-progress cleaning task
  (3) wait until any in-progress flushing process to complete
  (4) at this moment, we know the log won't be touched by the cleaner or the flusher any more and we can delete the whole directory synchronously.

Todos:
* If the overall logic looks good, we can get rid of OptimisticLockFailureException in LogCleaner too by canceling any cleaner task before truncating the log. This can be done in a follow up jira.


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
  core/src/main/scala/kafka/common/AllDoneException.scala PRE-CREATION 
  core/src/main/scala/kafka/common/CancelledCleaningException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao


Re: Review Request 15674: new patch

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15674/
-----------------------------------------------------------

(Updated Dec. 4, 2013, 6:13 p.m.)


Review request for kafka.


Summary (updated)
-----------------

new patch


Bugs: KAFKA-1074
    https://issues.apache.org/jira/browse/KAFKA-1074


Repository: kafka


Description (updated)
-------

kafka-1074; minor fix3


kafka-1074; minor fix2


kafka-1074; minor fix


kafka-1074; better synchronization with log cleaner


kafka-1074; fix 4


kafka-1074; fix 3


kafka-1074; fix 2


kafka-1074


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
  core/src/main/scala/kafka/common/AllDoneException.scala PRE-CREATION 
  core/src/main/scala/kafka/common/CancelledCleaningException.scala PRE-CREATION 
  core/src/main/scala/kafka/log/Log.scala beda421b558544196379bd9ab7855cea7614e8e3 
  core/src/main/scala/kafka/log/LogCleaner.scala ccde2abd99d2204775e4d3e9836aca34eb6747a3 
  core/src/main/scala/kafka/log/LogManager.scala 81be88aa618ed5614703d45a0556b77c97290085 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 5a312bf0803c1df4636e2e64ba83036a4e8e92dd 

Diff: https://reviews.apache.org/r/15674/diff/


Testing
-------


Thanks,

Jun Rao