You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Neha Narkhede <ne...@gmail.com> on 2014/02/01 02:45:34 UTC

Re: Review Request 17460: Patch for KAFKA-330

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/
-----------------------------------------------------------

(Updated Feb. 1, 2014, 1:45 a.m.)


Review request for kafka.


Bugs: KAFKA-330
    https://issues.apache.org/jira/browse/KAFKA-330


Repository: kafka


Description (updated)
-------

Updated docs for the new states. Removed the changes to log4j.properties


Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working


Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending


Changed controller to reference APIs in TopicDeletionManager. All unit tests pass


Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager


Addressed Guozhang's review comments


Fixed docs in a few places


Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume


Organized imports


Moved offline replica handling to controller failover


Reading replica assignment from zookeeper instead of local cache


Deleting unused APIs


Reverting the change to the stop replica request protocol. Instead hacking around with callbacks


All functionality and all unit tests working


Rebased with trunk after controller cleanup patch


Diffs (updated)
-----

  core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
  core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
  core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
  core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
  core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
  core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
  core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
  core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
  core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
  core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
  core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
  core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
  core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
  core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 

Diff: https://reviews.apache.org/r/17460/diff/


Testing
-------

Several integration tests added to test -

1. Topic deletion when all replica brokers are alive
2. Halt and resume topic deletion after a follower replica is restarted
3. Halt and resume topic deletion after a controller failover
4. Request handling during topic deletion
5. Topic deletion and partition reassignment in parallel
6. Topic deletion and preferred replica election in parallel
7. Topic deletion and per topic config changes in parallel


Thanks,

Neha Narkhede


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.

> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > Some high level comments.
> > 
> > 1. While most of the replica states are now managed in ReplicaStateMachine, there are a few still managed in TopicDeletionManager through haltedTopicsForDeletion and topicDeletionInProgress. It probably would be clearer if those are managed in ReplicaStateMachine too. topicDeletionInProgress seems redundant since it equals to at least one of the replicas in ReplicaDeletionStarted state. We can just add a helper function in ReplicaStateMachine. We may need to add a new  replica state in ReplicaStateManager to represent haltedTopicsForDeletion, but perhaps we can just reuse ReplicaDeletionFailed (and give it a more general name).
> > 
> > 2. The actual deletion logic is split between TopicDeletionManager and DeleteTopicsThread, which makes it a bit hard to read. I was thinking that TopicDeletionManager only has methods for synchronization with other threads (through the condition) and all real work will be included in DeleteTopicsThread. Compared with partition reassignment, the logic in topic deletion is a bit harder to read. Part of the reason is that in partition reassignment, all the logic is linearly written in one method. In topic deletion, the logic is not linear since it's driven by various callbacks. Perhaps just by putting all the logic in one way and put them close to each other will help. Also, a bunch of helper methods in TopicDeletionManager like the following should really be in ReplicaStateMachine.
> > isAtLeastOneReplicaInDeletionStartedState()
> > replicasInState()
> > AllReplicasForTopicDeleted()
> > 
> > 3. When a topic is in the process of being deleted, we prevent future operations like partition re-assignment and leader rebalancing on that topic. However, if one of those operations is already started, we allow topic deletion to start, which will then get blocked by those operations. Another way to do that is if a topic is to be deleted, we don't start the deletion until other ongoing operations like partition re-assignment finish (once finished, they can't be started again since they will see topic being deleted). This way, the logic in DeleteTopicsThread will be somewhat simpler since we don't have to check if it can interfere with other operations.
> > 
> > 4. In TopicDeletionManager, when doing wait/notify (and changing internal states), we expect the caller to hold the lock. All callers probably do hold the locks. However, I am wondering if it's better to get the lock anyway in TopicDeletionManager to make it more self contained. The locks are re-entrant. So locking it again won't hurt.
> > 
> > 5. TopicDeletionManager: It seems that replicas in ReplicaDeletionStarted state remain in that state until the topic is successfully deleted. So, it seems that when calling startReplicaDeletion(), we can pass in replicas already in ReplicaDeletionSuccessful state. However, transitioning from ReplicaDeletionSuccessful to ReplicaDeletionStarted is not allowed.
> > 
> >

1. Topic states have nothing to do with replica state machine. ReplicaStateMachine manages the states for individual replicas and as such no logic from topic deletion should leak there

2. Moved DeleteTopicsThread to be an inner class of TopicDeletionManager. This reduces the public APIs of TopicDeletionManager. And the logic in partition reassignment is easier since we don't handle the hard problem of deleting data on dead replicas there. Moved some APIs to ReplicaStateMachine

3. You have a point. That logic is redundant in the DeleteTopicThread. Removed it. 

4. I considered that and then decided against it simply because the TopicDeletionManager is not self sufficient. All APIs are used in some context that only KafkaController knows about. If we acquire the lock inside TopicDeletionManager, then we are essentially allowing those APIs to be invoked all by themselves, which will not do the right thing today.

5. I think what you are saying is that the transition from ReplicaDeletionSuccessful to OfflineReplica (on retry) doesn't make sense. Logically, replica deletion should be retried only if the deletion failed. Since the controller knows about the deletion status, it can retry only for replicas that are not deleted yet. However, that obviously complicates the retry logic to filter only on failed replicas. 


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/DeleteTopicsThread.scala, lines 42-43
> > <https://reviews.apache.org/r/17460/diff/8/?file=470129#file470129line42>
> >
> >     If the deletion of a replica is started and another failed broker is started immediately afterward, will we be missing the only chance of starting the deletion of the replica on the newly started broker (assuming there is a replica there not yet deleted)?

No, since deletion is halted when the broker goes down and resumed and retried when the broker comes up. Both broker startup and shutdown zk callbacks are serialized on the same lock. 


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/DeleteTopicsThread.scala, lines 66-67
> > <https://reviews.apache.org/r/17460/diff/8/?file=470129#file470129line66>
> >
> >     Can just use topicsToBeDeleted. Could we just merge this block and the previous block in the same foreach?

Nope. The previous block processes all topics for which deletion is in progress and can mark some for deletion retry. These should be retried in the 2nd block.


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, lines 84-85
> > <https://reviews.apache.org/r/17460/diff/8/?file=470130#file470130line84>
> >
> >     Why do we need to read from ZK, instead of from the cache?

I think this is no longer required after my last refactor


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, lines 384-385
> > <https://reviews.apache.org/r/17460/diff/8/?file=470130#file470130line384>
> >
> >     For replicas that are being deleted, should we move them to OnlineReplica state?

This will not work. On broker startup, it expects the full replica list from the controller and uses that to write the various checkpoint files. If we miss a partition there before it is deleted, it's entries from all those checkpoint files will disappear causing unexpected behavior until deletion is complete.


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, lines 466-472
> > <https://reviews.apache.org/r/17460/diff/8/?file=470130#file470130line466>
> >
> >     Should we disallow adding partitions when a topic is being deleted?

This is already done. Look at AddPartitionsListener


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala, lines 116-118
> > <https://reviews.apache.org/r/17460/diff/8/?file=470133#file470133line116>
> >
> >     Some of the state transitions are missing, e.g., ReplicaDeletionFailed -> ReplicaDeletionStarted.

This state change shouldn't exist. Removed it.


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, lines 169-170
> > <https://reviews.apache.org/r/17460/diff/8/?file=470134#file470134line169>
> >
> >     It's still not very clear to me why delete topic thread needs to be notified. failReplicaDeletion() is called during the processing of a delete replica response, do we expect the delete topic thread to receive another response?

It has to be triggered to retry deletion. Otherwise topic deletion may never complete.


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, lines 65-66
> > <https://reviews.apache.org/r/17460/diff/8/?file=470144#file470144line65>
> >
> >     We are stopping the leader replica, which may not be the rightmost broker in the broker list.

We are not stopping the leader replica in testResumeDeleteTopicWithRecoveredFollower()


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33699
-----------------------------------------------------------


On Feb. 5, 2014, 5:31 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 5, 2014, 5:31 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33699
-----------------------------------------------------------


Some high level comments.

1. While most of the replica states are now managed in ReplicaStateMachine, there are a few still managed in TopicDeletionManager through haltedTopicsForDeletion and topicDeletionInProgress. It probably would be clearer if those are managed in ReplicaStateMachine too. topicDeletionInProgress seems redundant since it equals to at least one of the replicas in ReplicaDeletionStarted state. We can just add a helper function in ReplicaStateMachine. We may need to add a new  replica state in ReplicaStateManager to represent haltedTopicsForDeletion, but perhaps we can just reuse ReplicaDeletionFailed (and give it a more general name).

2. The actual deletion logic is split between TopicDeletionManager and DeleteTopicsThread, which makes it a bit hard to read. I was thinking that TopicDeletionManager only has methods for synchronization with other threads (through the condition) and all real work will be included in DeleteTopicsThread. Compared with partition reassignment, the logic in topic deletion is a bit harder to read. Part of the reason is that in partition reassignment, all the logic is linearly written in one method. In topic deletion, the logic is not linear since it's driven by various callbacks. Perhaps just by putting all the logic in one way and put them close to each other will help. Also, a bunch of helper methods in TopicDeletionManager like the following should really be in ReplicaStateMachine.
isAtLeastOneReplicaInDeletionStartedState()
replicasInState()
AllReplicasForTopicDeleted()

3. When a topic is in the process of being deleted, we prevent future operations like partition re-assignment and leader rebalancing on that topic. However, if one of those operations is already started, we allow topic deletion to start, which will then get blocked by those operations. Another way to do that is if a topic is to be deleted, we don't start the deletion until other ongoing operations like partition re-assignment finish (once finished, they can't be started again since they will see topic being deleted). This way, the logic in DeleteTopicsThread will be somewhat simpler since we don't have to check if it can interfere with other operations.

4. In TopicDeletionManager, when doing wait/notify (and changing internal states), we expect the caller to hold the lock. All callers probably do hold the locks. However, I am wondering if it's better to get the lock anyway in TopicDeletionManager to make it more self contained. The locks are re-entrant. So locking it again won't hurt.

5. TopicDeletionManager: It seems that replicas in ReplicaDeletionStarted state remain in that state until the topic is successfully deleted. So, it seems that when calling startReplicaDeletion(), we can pass in replicas already in ReplicaDeletionSuccessful state. However, transitioning from ReplicaDeletionSuccessful to ReplicaDeletionStarted is not allowed.




core/src/main/scala/kafka/controller/DeleteTopicsThread.scala
<https://reviews.apache.org/r/17460/#comment63339>

    If the deletion of a replica is started and another failed broker is started immediately afterward, will we be missing the only chance of starting the deletion of the replica on the newly started broker (assuming there is a replica there not yet deleted)?



core/src/main/scala/kafka/controller/DeleteTopicsThread.scala
<https://reviews.apache.org/r/17460/#comment63340>

    Can just use topicsToBeDeleted. Could we just merge this block and the previous block in the same foreach?



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63290>

    Why do we need to read from ZK, instead of from the cache?



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63292>

    For replicas that are being deleted, should we move them to OnlineReplica state?



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63332>

    Should we disallow adding partitions when a topic is being deleted?



core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63296>

    Could we add the new replica states in the comment?



core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63297>

    Some of the state transitions are missing, e.g., ReplicaDeletionFailed -> ReplicaDeletionStarted.



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63337>

    This method should be private and it would be better if it's placed close to onPartitionDeletion().



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63338>

    It's still not very clear to me why delete topic thread needs to be notified. failReplicaDeletion() is called during the processing of a delete replica response, do we expect the delete topic thread to receive another response?



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63336>

    This comment seems to be outdated?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63356>

    These seems to be common for most tests. Could we share them?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63344>

    Those lines are common among all tests. Could we make a util function and reuse?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63345>

    Should we do those before the topic is deleted?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63346>

    We are stopping the leader replica, which may not be the rightmost broker in the broker list.



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63347>

    Do we need this comment?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63348>

    Should we shutdown the follower before topic deletion to make that the delete process is indeed paused?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63349>

    We are only testing produce request.



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63351>

    Could we just fold these two tests into the produceRequest test instead of testing from the beginning?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63353>

    Shutting down the controller may not change the leader. It seems that we should just shut down the preferred replica?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63354>

    Shutting down the controller may not change the leader. It seems that we should just shut down the preferred replica?


- Jun Rao


On Feb. 5, 2014, 5:31 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 5, 2014, 5:31 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.

> On Feb. 6, 2014, 7:15 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, line 330
> > <https://reviews.apache.org/r/17460/diff/11/?file=470964#file470964line330>
> >
> >     Shall we check if isTopicDeletionHalted(topic) here at the first place?

No, because if it is completed already, then halting doesn't have any impact. 


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33813
-----------------------------------------------------------


On Feb. 6, 2014, 6:29 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 6:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Feb. 6, 2014, 7:15 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, line 330
> > <https://reviews.apache.org/r/17460/diff/11/?file=470964#file470964line330>
> >
> >     Shall we check if isTopicDeletionHalted(topic) here at the first place?
> 
> Neha Narkhede wrote:
>     No, because if it is completed already, then halting doesn't have any impact.

When it is resumed, the topic will be removed from halting. So if it is completed, it should not be in the halted list?


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33813
-----------------------------------------------------------


On Feb. 6, 2014, 6:29 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 6:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33813
-----------------------------------------------------------



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63425>

    Have an info-level log entry as we do in halting topic deletion for partition reassignment.



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63429>

    val partitions?



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63433>

    Shall we check if isTopicDeletionHalted(topic) here at the first place?


- Guozhang Wang


On Feb. 6, 2014, 6:29 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 6:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.

> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > Already checked-in so this is really a follow-up review.
> > 
> > My overall take on the implementation is that it is (perhaps - because I'm
> > not 100 percent sure myself) complex mainly to handle corner cases which are
> > rare but I think recoverable. i.e., if we assume (and it may not be a valid
> > assumption) that topics will not/should never be deleted when there is live
> > traffic to a topic then just the call-backs and user-issued re-attempts on
> > failed deletion would be sufficient. We can talk more about that, but what
> > you have implemented is definitely more convenient and complete for the user.
> > 
> > Also, I encountered this problem while trying it out:
> > - bring up a broker
> > - ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc --sync
> >   < send a few messages >
> > - ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic abc
> > - I looked at state-change.log and made sure deletion completed
> > - ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc --sync
> >   < can never produce >
> > 
> > I see these on the server:
> > 
> > [2014-02-07 18:34:21,229] WARN [KafkaApi-0] Produce request with correlation id 2 from client  on partition [abc,0] failed due to Partition [abc,0] doesn't exist on 0 (kafka.server.KafkaApis)
> > [2014-02-07 18:34:21,229] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 2, topicAndPartition = [abc,0]] with Ack=0 (kafka.server.KafkaApis)
> > [2014-02-07 18:34:26,755] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
> > [2014-02-07 18:34:26,756] WARN [KafkaApi-0] Produce request with correlation id 9 from client  on partition [abc,1] failed due to Partition [abc,1] doesn't exist on 0 (kafka.server.KafkaApis)
> > [2014-02-07 18:34:26,756] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 9, topicAndPartition = [abc,1]] with Ack=0 (kafka.server.KafkaApis)
> > 
> > I had to bounce the broker to be able to produce again.
> > 
> > What did I do wrong? I can debug this later, but I'm going home soon :)
> >

Regarding your test, I realized that it is a bug that needs to be fixed. Will upload a patch later.

The case you mention looks at delete topic as just an admin initiated command that the admin can retry later. That is if there aren't other state changes going on in the cluster during the retry as well. Frequent state changes aren't as rare as you think, thereby making manual retries undesirable. For example, we have an automatic preferred replica election feature that will detect imbalance and trigger a preferred replica election one partition at a time. Now imagine you happen to try topic deletion during this time, which fails due one partition undergoing preferred replica election. You say, well fine, let me retry delete topic (hoping it succeeds this time) and it fails again. This time due to some other partition for that topic undergoing preferred replica election. This is ignoring the fact that we have no clear way to tell the user why the delete topic failed in the first place. Now extend this problem when we add the automatic TTL based topic deletion feature. Topic deletion 
 will get triggered by some TTL when other state changes are being issued by the admin at the same time. Of course, this time again, the admin has no way to tell that there is a TTL based topic deletion about to take place. This is still fine. You can argue that the controller can retry the TTL based topic deletion until the topic is deleted. But if you don't maintain the states of the respective replicas, how do you know when to retry? If you retry arbitrarily, then you risk hitting state changes due to other automatically triggered state changes (preferred replica elections). 

Overall, you see how, as we introduce these various periodic state changes, it is going to cause more operational overhead for retrying admin commands that fail with no particular explanation of what happened. I rather prefer the controller to be defensive enough to not get itself into a bad state by letting these state changes interleave and I also think that the guarantee that a delete topic, once issued, will always complete, leads to better user experience. Once we have the cancel feature added to all admin commands, topic will be deleted unless the operation is cancelled before starting. 


> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, line 758
> > <https://reviews.apache.org/r/17460/diff/12/?file=471027#file471027line758>
> >
> >     I think foldLeft's of this form can be simplified and made clearer by using exists.
> >     
> >     e.g., here:
> >     .filter(r => r._2.exists((res, r) => !controllerContext.liveBrokerIds.contains(r)))

I think you meant replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)). Fixed it.


> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, line 765
> > <https://reviews.apache.org/r/17460/diff/12/?file=471027#file471027line765>
> >
> >     You mean topics halted (due to replicas on dead brokers) or ineligible (due to reassignment/preferred leader election) correct? Can you update the message?

Halted <-> ineligible. Halted as a result of being ineligible. There are 3 conditions when topics become ineligible and as a result halted for deletion -
1. broker hosting at least one replica goes down
2. preferred replica election is in progress
3. partition reassignment is in progress


> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/PartitionStateMachine.scala, line 416
> > <https://reviews.apache.org/r/17460/diff/12/?file=471029#file471029line416>
> >
> >     yay..
> >

I can't seem to find this TODO anywhere in PartitionStateMachine. Were you looking at the latest patch or what's in trunk right now?


> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/PartitionStateMachine.scala, line 195
> > <https://reviews.apache.org/r/17460/diff/12/?file=471029#file471029line195>
> >
> >     logging can be updated - i.e., not necessary online -> offline.
> >     
> >     Should probably use (%s to %s).format (currState, targetState) here and elsewhere in handleStateChange.

Good point. Fixed.


> On Feb. 8, 2014, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala, line 361
> > <https://reviews.apache.org/r/17460/diff/12/?file=471030#file471030line361>
> >
> >     Would prefer ReplicaDeletionIneligible:
> >     - since it isn't really a failure... i.e., we should eventually resume.
> >     - and I prefer "Ineligible" to "Halted" because I think it is weird to have replicas on dead brokers to come back up _have_ to go through a state called ReplicaDeletion_Failed_ if there was in fact no attempt at deletion.

Well, in some cases it is a failure since if the broker returns a response with some error code, it is marked as failed. Also I think it is weird that from a Started state you go to Ineligible instead of Failed. I can see it both ways though. Agree on your point regarding dead brokers having to go through a state called ReplicaDeletionFailed though. Will change this


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33961
-----------------------------------------------------------


On Feb. 6, 2014, 7:37 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 7:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's follow up comments
> 
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33961
-----------------------------------------------------------


Already checked-in so this is really a follow-up review.

My overall take on the implementation is that it is (perhaps - because I'm
not 100 percent sure myself) complex mainly to handle corner cases which are
rare but I think recoverable. i.e., if we assume (and it may not be a valid
assumption) that topics will not/should never be deleted when there is live
traffic to a topic then just the call-backs and user-issued re-attempts on
failed deletion would be sufficient. We can talk more about that, but what
you have implemented is definitely more convenient and complete for the user.

Also, I encountered this problem while trying it out:
- bring up a broker
- ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc --sync
  < send a few messages >
- ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic abc
- I looked at state-change.log and made sure deletion completed
- ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc --sync
  < can never produce >

I see these on the server:

[2014-02-07 18:34:21,229] WARN [KafkaApi-0] Produce request with correlation id 2 from client  on partition [abc,0] failed due to Partition [abc,0] doesn't exist on 0 (kafka.server.KafkaApis)
[2014-02-07 18:34:21,229] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 2, topicAndPartition = [abc,0]] with Ack=0 (kafka.server.KafkaApis)
[2014-02-07 18:34:26,755] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2014-02-07 18:34:26,756] WARN [KafkaApi-0] Produce request with correlation id 9 from client  on partition [abc,1] failed due to Partition [abc,1] doesn't exist on 0 (kafka.server.KafkaApis)
[2014-02-07 18:34:26,756] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 9, topicAndPartition = [abc,1]] with Ack=0 (kafka.server.KafkaApis)

I had to bounce the broker to be able to produce again.

What did I do wrong? I can debug this later, but I'm going home soon :)



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63806>

    if -> until



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63896>

    I think foldLeft's of this form can be simplified and made clearer by using exists.
    
    e.g., here:
    .filter(r => r._2.exists((res, r) => !controllerContext.liveBrokerIds.contains(r)))



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63817>

    You mean topics halted (due to replicas on dead brokers) or ineligible (due to reassignment/preferred leader election) correct? Can you update the message?



core/src/main/scala/kafka/controller/PartitionStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63866>

    logging can be updated - i.e., not necessary online -> offline.
    
    Should probably use (%s to %s).format (currState, targetState) here and elsewhere in handleStateChange.



core/src/main/scala/kafka/controller/PartitionStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63868>

    yay..
    



core/src/main/scala/kafka/controller/PartitionStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63871>

    non-existent topics if any should be removed from the delete topics zk path. However, can this ever happen?
    



core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63812>

    Would prefer ReplicaDeletionIneligible:
    - since it isn't really a failure... i.e., we should eventually resume.
    - and I prefer "Ineligible" to "Halted" because I think it is weird to have replicas on dead brokers to come back up _have_ to go through a state called ReplicaDeletion_Failed_ if there was in fact no attempt at deletion.


- Joel Koshy


On Feb. 6, 2014, 7:37 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 7:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's follow up comments
> 
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/
-----------------------------------------------------------

(Updated Feb. 8, 2014, 7:07 p.m.)


Review request for kafka.


Bugs: KAFKA-330
    https://issues.apache.org/jira/browse/KAFKA-330


Repository: kafka


Description (updated)
-------

KAFKA-330: Joel's follow up review comments


Diffs (updated)
-----

  core/src/main/scala/kafka/controller/ControllerChannelManager.scala a1ee5a707412166629cf9025cac054570d6d27c0 
  core/src/main/scala/kafka/controller/KafkaController.scala d812cb4121d7f8705c6d54eae2fb67d824a5f79f 
  core/src/main/scala/kafka/controller/PartitionStateMachine.scala 57c96b5539f20d53238280b6e059325c757bddcf 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 613aec6f40db5831f62e1391450ef02b63ae3390 
  core/src/main/scala/kafka/controller/TopicDeletionManager.scala 91a446ddc8aeb2ac90a62e274f612da45a67f772 
  core/src/main/scala/kafka/server/KafkaApis.scala c56ad503d83e31850cc2032d192d29506de96fdd 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 974b057a880569ddd2035980d317147d892cf24f 

Diff: https://reviews.apache.org/r/17460/diff/


Testing
-------

Several integration tests added to test -

1. Topic deletion when all replica brokers are alive
2. Halt and resume topic deletion after a follower replica is restarted
3. Halt and resume topic deletion after a controller failover
4. Request handling during topic deletion
5. Topic deletion and partition reassignment in parallel
6. Topic deletion and preferred replica election in parallel
7. Topic deletion and per topic config changes in parallel


Thanks,

Neha Narkhede


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.

> On Feb. 6, 2014, 11:48 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/PartitionStateMachine.scala, lines 94-96
> > <https://reviews.apache.org/r/17460/diff/12/?file=471029#file471029line94>
> >
> >     The inner "if" is not properly indented.

That is due to line wrapping


> On Feb. 6, 2014, 11:48 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/PartitionStateMachine.scala, lines 457-458
> > <https://reviews.apache.org/r/17460/diff/12/?file=471029#file471029line457>
> >
> >     | should be ||.

No, it shouldn't. This is set union


> On Feb. 6, 2014, 11:48 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, lines 52-53
> > <https://reviews.apache.org/r/17460/diff/12/?file=471031#file471031line52>
> >
> >     TopicDeletionSuccessfull should be TopicDeletionFailed.

Good observation. Fixed.


> On Feb. 6, 2014, 11:48 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, lines 339-345
> > <https://reviews.apache.org/r/17460/diff/12/?file=471031#file471031line339>
> >
> >     Is this check and the corresponding code necessary? The same check is done later in line 358 when calling isTopicEligibleForDeletion().

It is done for better logging. I found it much easier to follow INFO level logging with this information than without it. Like I said before, to process retries correctly, it is necessary to process the "in progress" state of all topics first, so that those can be retried immediately.


> On Feb. 6, 2014, 11:48 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, lines 39-40
> > <https://reviews.apache.org/r/17460/diff/12/?file=471041#file471041line39>
> >
> >     Since the code in line 45 depends on broker 2 to be stopped, it's probably better to directly select broker 2 here, independent of who is the leader.

It does select broker 2 here and I see your point here. It is better to not assume the position of this follower in the server list. I fixed the assert check to explicitly drop the selected follower


> On Feb. 6, 2014, 11:48 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, lines 275-277
> > <https://reviews.apache.org/r/17460/diff/12/?file=471031#file471031line275>
> >
> >     Some of those dead replicas can be in TopicDeletionSuccesful state. So, we don't need to move them to OfflineReplica state.

Good observation. Fixed it.


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33858
-----------------------------------------------------------


On Feb. 6, 2014, 7:37 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 7:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's follow up comments
> 
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.

> On Feb. 6, 2014, 11:48 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/PartitionStateMachine.scala, lines 457-458
> > <https://reviews.apache.org/r/17460/diff/12/?file=471029#file471029line457>
> >
> >     | should be ||.
> 
> Neha Narkhede wrote:
>     No, it shouldn't. This is set union
> 
> Jun Rao wrote:
>     No, both variables are booleans.

Actually that's true. I misread this as the other place that you suggested using || for set unions


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33858
-----------------------------------------------------------


On Feb. 6, 2014, 7:37 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 7:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's follow up comments
> 
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Jun Rao <ju...@gmail.com>.

> On Feb. 6, 2014, 11:48 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/PartitionStateMachine.scala, lines 457-458
> > <https://reviews.apache.org/r/17460/diff/12/?file=471029#file471029line457>
> >
> >     | should be ||.
> 
> Neha Narkhede wrote:
>     No, it shouldn't. This is set union

No, both variables are booleans.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33858
-----------------------------------------------------------


On Feb. 6, 2014, 7:37 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 7:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's follow up comments
> 
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33858
-----------------------------------------------------------

Ship it!


Some minor comments below.


core/src/main/scala/kafka/controller/PartitionStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63519>

    The inner "if" is not properly indented.



core/src/main/scala/kafka/controller/PartitionStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63510>

    | should be ||.



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63495>

    TopicDeletionSuccessfull should be TopicDeletionFailed.



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63489>

    Some of those dead replicas can be in TopicDeletionSuccesful state. So, we don't need to move them to OfflineReplica state.



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63491>

    Some of those aliveReplicas are already in TopicDeletionSuccessful state and we probably shouldn't move them back to ReplicaDetionStarted state.



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63521>

    Is this check and the corresponding code necessary? The same check is done later in line 358 when calling isTopicEligibleForDeletion().



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63484>

    It's probably better to rename this method to sth like markFailedReplicasForRetry, since we are not doing the actual retry in this method.



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63525>

    Since the code in line 45 depends on broker 2 to be stopped, it's probably better to directly select broker 2 here, independent of who is the leader.



core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
<https://reviews.apache.org/r/17460/#comment63531>

    PartitionAndReplica is unused.



core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
<https://reviews.apache.org/r/17460/#comment63532>

    Remove unused imports.


- Jun Rao


On Feb. 6, 2014, 7:37 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 7:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's follow up comments
> 
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/
-----------------------------------------------------------

(Updated Feb. 6, 2014, 7:37 p.m.)


Review request for kafka.


Bugs: KAFKA-330
    https://issues.apache.org/jira/browse/KAFKA-330


Repository: kafka


Description (updated)
-------

Addressed Guozhang's follow up comments


Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover


Refactored tests per Jun and Guozhang's feedback


Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending


Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging


Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion


Updated docs for the new states. Removed the changes to log4j.properties


Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working


Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending


Changed controller to reference APIs in TopicDeletionManager. All unit tests pass


Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager


Addressed Guozhang's review comments


Fixed docs in a few places


Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume


Organized imports


Moved offline replica handling to controller failover


Reading replica assignment from zookeeper instead of local cache


Deleting unused APIs


Reverting the change to the stop replica request protocol. Instead hacking around with callbacks


All functionality and all unit tests working


Rebased with trunk after controller cleanup patch


Diffs (updated)
-----

  core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
  core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
  core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
  core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
  core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
  core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
  core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
  core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
  core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
  core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
  core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
  core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
  core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
  core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 

Diff: https://reviews.apache.org/r/17460/diff/


Testing
-------

Several integration tests added to test -

1. Topic deletion when all replica brokers are alive
2. Halt and resume topic deletion after a follower replica is restarted
3. Halt and resume topic deletion after a controller failover
4. Request handling during topic deletion
5. Topic deletion and partition reassignment in parallel
6. Topic deletion and preferred replica election in parallel
7. Topic deletion and per topic config changes in parallel


Thanks,

Neha Narkhede


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/
-----------------------------------------------------------

(Updated Feb. 6, 2014, 6:29 p.m.)


Review request for kafka.


Bugs: KAFKA-330
    https://issues.apache.org/jira/browse/KAFKA-330


Repository: kafka


Description (updated)
-------

Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover


Refactored tests per Jun and Guozhang's feedback


Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending


Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging


Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion


Updated docs for the new states. Removed the changes to log4j.properties


Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working


Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending


Changed controller to reference APIs in TopicDeletionManager. All unit tests pass


Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager


Addressed Guozhang's review comments


Fixed docs in a few places


Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume


Organized imports


Moved offline replica handling to controller failover


Reading replica assignment from zookeeper instead of local cache


Deleting unused APIs


Reverting the change to the stop replica request protocol. Instead hacking around with callbacks


All functionality and all unit tests working


Rebased with trunk after controller cleanup patch


Diffs (updated)
-----

  core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
  core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
  core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
  core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
  core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
  core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
  core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
  core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
  core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
  core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
  core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
  core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
  core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
  core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 

Diff: https://reviews.apache.org/r/17460/diff/


Testing
-------

Several integration tests added to test -

1. Topic deletion when all replica brokers are alive
2. Halt and resume topic deletion after a follower replica is restarted
3. Halt and resume topic deletion after a controller failover
4. Request handling during topic deletion
5. Topic deletion and partition reassignment in parallel
6. Topic deletion and preferred replica election in parallel
7. Topic deletion and per topic config changes in parallel


Thanks,

Neha Narkhede


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/
-----------------------------------------------------------

(Updated Feb. 6, 2014, 5:42 p.m.)


Review request for kafka.


Bugs: KAFKA-330
    https://issues.apache.org/jira/browse/KAFKA-330


Repository: kafka


Description
-------

Refactored tests per Jun and Guozhang's feedback


Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending


Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging


Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion


Updated docs for the new states. Removed the changes to log4j.properties


Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working


Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending


Changed controller to reference APIs in TopicDeletionManager. All unit tests pass


Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager


Addressed Guozhang's review comments


Fixed docs in a few places


Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume


Organized imports


Moved offline replica handling to controller failover


Reading replica assignment from zookeeper instead of local cache


Deleting unused APIs


Reverting the change to the stop replica request protocol. Instead hacking around with callbacks


All functionality and all unit tests working


Rebased with trunk after controller cleanup patch


Diffs (updated)
-----

  core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
  core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
  core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
  core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
  core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
  core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
  core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
  core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
  core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
  core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
  core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
  core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
  core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
  core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 

Diff: https://reviews.apache.org/r/17460/diff/


Testing
-------

Several integration tests added to test -

1. Topic deletion when all replica brokers are alive
2. Halt and resume topic deletion after a follower replica is restarted
3. Halt and resume topic deletion after a controller failover
4. Request handling during topic deletion
5. Topic deletion and partition reassignment in parallel
6. Topic deletion and preferred replica election in parallel
7. Topic deletion and per topic config changes in parallel


Thanks,

Neha Narkhede


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/
-----------------------------------------------------------

(Updated Feb. 6, 2014, 3:49 p.m.)


Review request for kafka.


Bugs: KAFKA-330
    https://issues.apache.org/jira/browse/KAFKA-330


Repository: kafka


Description (updated)
-------

Refactored tests per Jun and Guozhang's feedback


Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending


Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging


Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion


Updated docs for the new states. Removed the changes to log4j.properties


Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working


Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending


Changed controller to reference APIs in TopicDeletionManager. All unit tests pass


Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager


Addressed Guozhang's review comments


Fixed docs in a few places


Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume


Organized imports


Moved offline replica handling to controller failover


Reading replica assignment from zookeeper instead of local cache


Deleting unused APIs


Reverting the change to the stop replica request protocol. Instead hacking around with callbacks


All functionality and all unit tests working


Rebased with trunk after controller cleanup patch


Diffs (updated)
-----

  core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
  core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
  core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
  core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
  core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
  core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
  core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
  core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
  core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
  core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
  core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
  core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
  core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
  core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 

Diff: https://reviews.apache.org/r/17460/diff/


Testing
-------

Several integration tests added to test -

1. Topic deletion when all replica brokers are alive
2. Halt and resume topic deletion after a follower replica is restarted
3. Halt and resume topic deletion after a controller failover
4. Request handling during topic deletion
5. Topic deletion and partition reassignment in parallel
6. Topic deletion and preferred replica election in parallel
7. Topic deletion and per topic config changes in parallel


Thanks,

Neha Narkhede


Re: Review Request 17460: Patch for KAFKA-330

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote:
> > In the follow-up patch that serialize all the admin tasks in the back ground thread, I would suggest switching away from using the callbacks to trigger state change while executing the process but depending on some ZK path change, as we did for partition re-assignment. Since the controller-broker communication is already async, I think it is OK to not retry the stopReplicaRequest, but let the brokers to detect some replicas it currently holds have already be deleted through MetadataRequest, which will become the source of truth anyways.
> 
> Neha Narkhede wrote:
>     Ya, I thought about the signal vs zookeeper path and I didn't see a point to go through zookeeper for something that doesn't require any distributed co-ordination at all and as such using zookeeper like a database and signal mechanism is not a good idea. Having said that, the reason we use the zookeeper path notification for partition reassignment is because the common ISR path for a partition can be updated by not just the controller but also the brokers, thereby requiring a zookeeper based watch mechanism. Delete topic, on the other hand, requires the controller itself to maintain internal state and signal based of that. 
>     
>     Also, having the broker delete the logs (as I mentioned while explaining the different design choices for delete topic on the JIRA) is infeasible since it doesn't know the version of the topic. It is easier to let the controller not allow topic deletion to complete unless all replicas have acknowledged their delete from the respective brokers

I think you are right. After reading the latest patch I am more sold on the callback approach. Previously I was thinking since the controller's requests to the brokers are async anyways, for example, if a broker missed processing a LeaderAndIsr request because it is down it will get the same request again on startup anyways, we can do the same for delete topics. Thinking it twice I believe it may be better to wait for replica deletion to finish,.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33711
-----------------------------------------------------------


On Feb. 6, 2014, 6:29 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 6:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, which means that successfully deleted replicas will not be retried unless there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.

> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote:
> > In the follow-up patch that serialize all the admin tasks in the back ground thread, I would suggest switching away from using the callbacks to trigger state change while executing the process but depending on some ZK path change, as we did for partition re-assignment. Since the controller-broker communication is already async, I think it is OK to not retry the stopReplicaRequest, but let the brokers to detect some replicas it currently holds have already be deleted through MetadataRequest, which will become the source of truth anyways.

Ya, I thought about the signal vs zookeeper path and I didn't see a point to go through zookeeper for something that doesn't require any distributed co-ordination at all and as such using zookeeper like a database and signal mechanism is not a good idea. Having said that, the reason we use the zookeeper path notification for partition reassignment is because the common ISR path for a partition can be updated by not just the controller but also the brokers, thereby requiring a zookeeper based watch mechanism. Delete topic, on the other hand, requires the controller itself to maintain internal state and signal based of that. 

Also, having the broker delete the logs (as I mentioned while explaining the different design choices for delete topic on the JIRA) is infeasible since it doesn't know the version of the topic. It is easier to let the controller not allow topic deletion to complete unless all replicas have acknowledged their delete from the respective brokers


> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala, line 29
> > <https://reviews.apache.org/r/17460/diff/8/?file=470124#file470124line29>
> >
> >     This import may be removed: this is the only change in this file.

Nope. It cannot be removed, it is required to distinguish between Set from Predef and Set from collection. 


> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, line 307
> > <https://reviews.apache.org/r/17460/diff/8/?file=470128#file470128line307>
> >
> >     When the broker is down, the RequestSendThread will just keep trying resend, and the callback function will not be executed until the broker is back and a receive is returned from channel.receive(), is that correct? If yes, then will the process be blocked during the time the broker is down?

Nope. The sendRequest is non blocking. It cannot block until the queue of requests sent to that broker grows a lot. The controller doesn't send too many requests to dead brokers, so I don't see how that can happen unless there is a bug. Topic deletion is halted when at least one replica goes down, which will prevent the controller from sending requests to the dead broker. 


> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, line 119
> > <https://reviews.apache.org/r/17460/diff/8/?file=470134#file470134line119>
> >
> >     I remember the coding principle for this function is to omit ()?

That is, if the API makes no internal state changes and this one does. So I included the ()


> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, line 131
> > <https://reviews.apache.org/r/17460/diff/8/?file=470134#file470134line131>
> >
> >     It is an exception case if topicsTobeDeleted.!contains(topics).

How? It is set intersection. Either your set intersects and you have a resulting set of non zero size or it doesn't, in which you end up with an empty set


> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote:
> > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, line 20
> > <https://reviews.apache.org/r/17460/diff/8/?file=470144#file470144line20>
> >
> >     On general comment is that since for each test we need to setup ZK and Broker before and tear them down after, which could be dominant time consuming in running these tests, maybe we can merge some of the testcases into one?

Ya, agree that these tests run longer. At the same time, these tests also validate some important test cases. I guess once we improve our system tests, we can move some of these tests there. Until then, it is useful to have these tests act as integration tests. We could also merge some of them together and if we do, then we have to put up with really large testcases, which will be hard to understand and maintain. Taking your point though, the tests for validating different request types could be clubbed together without losing readability.


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33711
-----------------------------------------------------------


On Feb. 6, 2014, 3:49 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 3:49 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.

> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, line 215
> > <https://reviews.apache.org/r/17460/diff/8/?file=470128#file470128line215>
> >
> >     Instead of checking the replicaId == -1 case here, I feel it is better to handle it in ReplicaStateMachine.handleStateChange function, for indicating the devs that this is possible that the leader becomes -1.

The issue with pushing it up is that every invocation of addLeaderAndIsrRequestForBrokers or addStopReplicaRequestForBrokers or addUpdateMetadataRequestForBrokers has to handle the obvious logic of not sending a request to a broker with ID -1. I think there is a cleaner way to handle this and if we want to do it properly, we can file a refactoring JIRA to fix how the "no leader and no replicas available" case is handled.


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33711
-----------------------------------------------------------


On Feb. 6, 2014, 5:42 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 5:42 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, line 131
> > <https://reviews.apache.org/r/17460/diff/8/?file=470134#file470134line131>
> >
> >     It is an exception case if topicsTobeDeleted.!contains(topics).
> 
> Neha Narkhede wrote:
>     How? It is set intersection. Either your set intersects and you have a resulting set of non zero size or it doesn't, in which you end up with an empty set

What I meant is that should topics always contain all items in topicsToBeDeleted?


> On Feb. 5, 2014, 10:23 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, line 307
> > <https://reviews.apache.org/r/17460/diff/8/?file=470128#file470128line307>
> >
> >     When the broker is down, the RequestSendThread will just keep trying resend, and the callback function will not be executed until the broker is back and a receive is returned from channel.receive(), is that correct? If yes, then will the process be blocked during the time the broker is down?
> 
> Neha Narkhede wrote:
>     Nope. The sendRequest is non blocking. It cannot block until the queue of requests sent to that broker grows a lot. The controller doesn't send too many requests to dead brokers, so I don't see how that can happen unless there is a bug. Topic deletion is halted when at least one replica goes down, which will prevent the controller from sending requests to the dead broker.

My bad, I should not say "broker is down", what I meant is that the state of the deleting replica will transit to failure if either 1) an error response is received and the callback make the switch, or 2) in onBrokerFailure function. If, a broker is not failed but it does not receive the stopReplicaRequest due to, for example, closed socket server, but the broker's ZK path has not been deleted, then the sender thread will just keep resending. Is that right?


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33711
-----------------------------------------------------------


On Feb. 6, 2014, 5:42 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 5:42 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33711
-----------------------------------------------------------


In the follow-up patch that serialize all the admin tasks in the back ground thread, I would suggest switching away from using the callbacks to trigger state change while executing the process but depending on some ZK path change, as we did for partition re-assignment. Since the controller-broker communication is already async, I think it is OK to not retry the stopReplicaRequest, but let the brokers to detect some replicas it currently holds have already be deleted through MetadataRequest, which will become the source of truth anyways.


core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
<https://reviews.apache.org/r/17460/#comment63321>

    Ditto as below



core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
<https://reviews.apache.org/r/17460/#comment63320>

    This import may be removed: this is the only change in this file.



core/src/main/scala/kafka/api/StopReplicaResponse.scala
<https://reviews.apache.org/r/17460/#comment63330>

    Could you change to (topicAndPartition, errorCode) <- responseMap ?



core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
<https://reviews.apache.org/r/17460/#comment63331>

    Ditto as above.



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment63334>

    Instead of checking the replicaId == -1 case here, I feel it is better to handle it in ReplicaStateMachine.handleStateChange function, for indicating the devs that this is possible that the leader becomes -1.



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment63335>

    Ditto as above.



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment63355>

    When the broker is down, the RequestSendThread will just keep trying resend, and the callback function will not be executed until the broker is back and a receive is returned from channel.receive(), is that correct? If yes, then will the process be blocked during the time the broker is down?



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63342>

    I remember the coding principle for this function is to omit ()?



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63341>

    It is an exception case if topicsTobeDeleted.!contains(topics).



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63343>

    Ditto above



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63352>

    Use inLock here?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63350>

    On general comment is that since for each test we need to setup ZK and Broker before and tear them down after, which could be dominant time consuming in running these tests, maybe we can merge some of the testcases into one?


- Guozhang Wang


On Feb. 5, 2014, 5:31 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 5, 2014, 5:31 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/
-----------------------------------------------------------

(Updated Feb. 5, 2014, 5:31 p.m.)


Review request for kafka.


Bugs: KAFKA-330
    https://issues.apache.org/jira/browse/KAFKA-330


Repository: kafka


Description (updated)
-------

Joel's review suggestions - Changed the controllerLock instances to inLock instead of synchronized, fixed some logging


Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion


Updated docs for the new states. Removed the changes to log4j.properties


Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working


Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending


Changed controller to reference APIs in TopicDeletionManager. All unit tests pass


Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager


Addressed Guozhang's review comments


Fixed docs in a few places


Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume


Organized imports


Moved offline replica handling to controller failover


Reading replica assignment from zookeeper instead of local cache


Deleting unused APIs


Reverting the change to the stop replica request protocol. Instead hacking around with callbacks


All functionality and all unit tests working


Rebased with trunk after controller cleanup patch


Diffs (updated)
-----

  core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
  core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
  core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
  core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
  core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
  core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
  core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
  core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
  core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
  core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
  core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
  core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
  core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
  core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
  core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 

Diff: https://reviews.apache.org/r/17460/diff/


Testing
-------

Several integration tests added to test -

1. Topic deletion when all replica brokers are alive
2. Halt and resume topic deletion after a follower replica is restarted
3. Halt and resume topic deletion after a controller failover
4. Request handling during topic deletion
5. Topic deletion and partition reassignment in parallel
6. Topic deletion and preferred replica election in parallel
7. Topic deletion and per topic config changes in parallel


Thanks,

Neha Narkhede


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.

> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, line 220
> > <https://reviews.apache.org/r/17460/diff/7/?file=462654#file462654line220>
> >
> >     Why not only limit to the topic-partitions relevant to this leaderAndIsrRequest?

With this patch, UpdateMetadata request is the mechanism used by the brokers to know which topics are live and which topics it will reject incoming requests for. Hence, this list always has to be the full list of active topics in the cluster. As such, it will not include topics in /admin/delete_topics


> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, line 304
> > <https://reviews.apache.org/r/17460/diff/7/?file=462654#file462654line304>
> >
> >     You mean just put all in a single StopReplicaRequest? If so, any reason not to do it now?

Was planning on making the change after I received a more detailed review. Will probably include it in the next patch.


> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, line 555
> > <https://reviews.apache.org/r/17460/diff/7/?file=462656#file462656line555>
> >
> >     I'm unclear whether this and the call from onPreferredReplica.. are really required. i.e., prior to those operations beginning we do check if those topics are being deleted but I haven't fully thought this through.

The comment above it -

// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed

Basically, if reassignment was in progress when delete topic is issued, we pause/halt the deletion of that topic until the reassignment finishes. Same with preferred replica election. Hence the resume() call.


> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, line 101
> > <https://reviews.apache.org/r/17460/diff/7/?file=462660#file462660line101>
> >
> >     Can you fix up the comment a bit? i.e., the lock will in fact be released while awaiting, so maybe you should just say that the lock should be acquired before calling.

I'm certain you have OCD :)


> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, line 1195
> > <https://reviews.apache.org/r/17460/diff/7/?file=462656#file462656line1195>
> >
> >     From the lock javadoc:
> >     Note that Lock instances are just normal objects and can themselves be used as the target in a synchronized statement. Acquiring the monitor lock of a Lock instance has no specified relationship with invoking any of the lock() methods of that instance. It is recommended that to avoid confusion you never use Lock instances in this way, except within their own implementation. 
> >     
> >     So you may need to still synchronize on this lock object before calling controllerLock.lock - or switch to lock/unlock everywhere we currently use synchronized.

Good point! Will change all those instances to lock() instead of synchronized.


> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/DeleteTopicsThread.scala, line 30
> > <https://reviews.apache.org/r/17460/diff/7/?file=462655#file462655line30>
> >
> >     Why is the Set.empty[String] needed?

That's done to initialize an immutable set with a set of values.


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33668
-----------------------------------------------------------


On Feb. 1, 2014, 10:58 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2014, 10:58 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.

> On Feb. 5, 2014, 2:50 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/controller/ControllerChannelManager.scala, line 304
> > <https://reviews.apache.org/r/17460/diff/7/?file=462654#file462654line304>
> >
> >     You mean just put all in a single StopReplicaRequest? If so, any reason not to do it now?
> 
> Neha Narkhede wrote:
>     Was planning on making the change after I received a more detailed review. Will probably include it in the next patch.

Actually, thinking about this more. The reason I didn't include the batching here is because the way stop replica is designed is essentially one replica at a time and the callback is also associated with every replica. Batching this is a pretty large refactor of existing code that needs to be changed across the board for batching stop replica requests. Prefer to do that separately. 


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33668
-----------------------------------------------------------


On Feb. 1, 2014, 10:58 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2014, 10:58 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33668
-----------------------------------------------------------


I haven't finished reviewing but will continue when I get time. I have a few comments off the bat.


core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment63223>

    



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment63224>

    Why not only limit to the topic-partitions relevant to this leaderAndIsrRequest?



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment63227>

    You mean just put all in a single StopReplicaRequest? If so, any reason not to do it now?



core/src/main/scala/kafka/controller/DeleteTopicsThread.scala
<https://reviews.apache.org/r/17460/#comment63255>

    Why is the Set.empty[String] needed?



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63257>

    I'm unclear whether this and the call from onPreferredReplica.. are really required. i.e., prior to those operations beginning we do check if those topics are being deleted but I haven't fully thought this through.



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63240>

    From the lock javadoc:
    Note that Lock instances are just normal objects and can themselves be used as the target in a synchronized statement. Acquiring the monitor lock of a Lock instance has no specified relationship with invoking any of the lock() methods of that instance. It is recommended that to avoid confusion you never use Lock instances in this way, except within their own implementation. 
    
    So you may need to still synchronize on this lock object before calling controllerLock.lock - or switch to lock/unlock everywhere we currently use synchronized.



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63251>

    Thanks for writing up this excellent summary.



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63231>

    Can you fix up the comment a bit? i.e., the lock will in fact be released while awaiting, so maybe you should just say that the lock should be acquired before calling.


- Joel Koshy


On Feb. 1, 2014, 10:58 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2014, 10:58 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 17460: Patch for KAFKA-330

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/
-----------------------------------------------------------

(Updated Feb. 1, 2014, 10:58 p.m.)


Review request for kafka.


Bugs: KAFKA-330
    https://issues.apache.org/jira/browse/KAFKA-330


Repository: kafka


Description (updated)
-------

Removed init() API from TopicDeletionManager and added docs to TopicDeletionManager to describe the lifecycle of topic deletion


Updated docs for the new states. Removed the changes to log4j.properties


Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, unit tests working


Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup of some APIs pending


Changed controller to reference APIs in TopicDeletionManager. All unit tests pass


Introduced a TopicDeletionManager. KafkaController changes pending to use the new TopicDeletionManager


Addressed Guozhang's review comments


Fixed docs in a few places


Fixed the resume logic for partition reassignment to also include topics that are queued up for deletion, since topic deletetion is halted until partition reassignment can finish anyway. We need to let partition reassignment finish (since it started before topic deletion) so that topic deletion can resume


Organized imports


Moved offline replica handling to controller failover


Reading replica assignment from zookeeper instead of local cache


Deleting unused APIs


Reverting the change to the stop replica request protocol. Instead hacking around with callbacks


All functionality and all unit tests working


Rebased with trunk after controller cleanup patch


Diffs (updated)
-----

  core/src/main/scala/kafka/admin/AdminUtils.scala a167756f0fd358574c8ccb42c5c96aaf13def4f5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 842c11047cca0531fbc572fdb25523244ba2b626 
  core/src/main/scala/kafka/api/ControlledShutdownResponse.scala a80aa4924cfe9a4670591d03258dd82c428bc3af 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala a984878fbd8147b21211829a49de511fd1335421 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 820f0f57b00849a588a840358d07f3a4a31772d4 
  core/src/main/scala/kafka/api/StopReplicaResponse.scala d7e36308263aec2298e8adff8f22e18212e33fca 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala ea8485b479155b479c575ebc89a4f73086c872cb 
  core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
  core/src/main/scala/kafka/controller/KafkaController.scala a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
  core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala fd9200f3bf941aab54df798bb5899eeb552ea3a3 
  core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a403fc73edaecbddf55858703c640b11c0 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 483559aa64726c51320d18b64a1b48f8fb2905a0 
  core/src/main/scala/kafka/controller/TopicDeletionManager.scala PRE-CREATION 
  core/src/main/scala/kafka/network/BlockingChannel.scala d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
  core/src/main/scala/kafka/server/KafkaApis.scala bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
  core/src/main/scala/kafka/server/OffsetCheckpoint.scala b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
  core/src/main/scala/kafka/server/TopicConfigManager.scala 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
  core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b189619bdc1b0d2bba8e8f88467fce014be96ccd 
  core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b8e5668383b287b2a86385df65e51b5108 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b469fece0b28e1d04dcd7b7015c12576abb 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982a1e71e3f50a073c4ae181096d32914f3e 
  core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 03e6266ffdad5891ec81df786bd094066b78b4c0 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 426b1a7bea1d83a64081f2c6b672c88c928713b7 

Diff: https://reviews.apache.org/r/17460/diff/


Testing
-------

Several integration tests added to test -

1. Topic deletion when all replica brokers are alive
2. Halt and resume topic deletion after a follower replica is restarted
3. Halt and resume topic deletion after a controller failover
4. Request handling during topic deletion
5. Topic deletion and partition reassignment in parallel
6. Topic deletion and preferred replica election in parallel
7. Topic deletion and per topic config changes in parallel


Thanks,

Neha Narkhede