You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2012/05/15 20:21:12 UTC

[jira] [Created] (KAFKA-340) ensure ISR has enough brokers during clean shutdown of a broker

Jun Rao created KAFKA-340:
-----------------------------

             Summary: ensure ISR has enough brokers during clean shutdown of a broker
                 Key: KAFKA-340
                 URL: https://issues.apache.org/jira/browse/KAFKA-340
             Project: Kafka
          Issue Type: Sub-task
          Components: core
    Affects Versions: 0.8
            Reporter: Jun Rao


If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Closed] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Neha Narkhede closed KAFKA-340.
-------------------------------

    
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, KAFKA-340-v3.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13482523#comment-13482523 ] 

Neha Narkhede commented on KAFKA-340:
-------------------------------------

Thanks for patch v2, looks good overall. Couple minor questions -

3. KafkaController

3.1 In maybeRemoveReplicaFromIsr, let's add a warn statement that says why the replica was not removed from the isr. There are 3 conditions when that happens, lets state which condition was satisfied. This will be very useful for debugging.
3.2 If removing the replica from isr is unnecessary, why do we still have to send the leader and isr request to the leader ?
3.3 In the OfflineReplica state change, since the leader doesn't change, we probably don't need to update the leader and isr cache too. Realize that this is not introduced in this patch, but will be nice to fix it anyway.
3.4 What is the point of sending leader and isr request at the end of shutdownBroker, since the OfflineReplica state change would've taken care of that anyway. It seems like you just need to send the stop replica request with the delete partitions flag turned off, no ?

4. PartitionLeaderSelector

Minor logging correction - So far we have been using [%s,%d] convention for logging topic-partition. Let's change the following logging statement to do the same -
debug("Partition %s-%d : current leader = %d, new leader = %d"

5. StopReplicaRequest

Probably makes sense to change warn to error or even throw exception to alert on an invalid byte in the stop replica request. This is pretty serious and probably points to corruption somewhere in the code

                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-340) ensure ISR has enough brokers during clean shutdown of a broker

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-340:
-----------------------------

    Labels: bugs  (was: features)
    
> ensure ISR has enough brokers during clean shutdown of a broker
> ---------------------------------------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>              Labels: bugs
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-340) ensure ISR has enough brokers during clean shutdown of a broker

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-340:
-----------------------------

    Labels: optimization  (was: )
    
> ensure ISR has enough brokers during clean shutdown of a broker
> ---------------------------------------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>              Labels: optimization
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13482553#comment-13482553 ] 

Jun Rao commented on KAFKA-340:
-------------------------------

Thanks for patch v2. Some more comments and clarification:

20. ControllerContext.liveBrokers: We already filter out shutdown brokers in the getter. Do we still need to do the filtering at the setter?

21. KafkaController.shutdownBroker():
21.1 We probably should synchronize this method to allow only 1 outstanding shutdown operation at a time. Various maps in ControllerContext are updated in this method and they are not concurrent.
21.2 My understanding is that this method returns the number of partitions whose leader is still on this broker. If so, shouldn't partitionsRemaining be updated after leaders are moved? Also, could you add a comment to this method to describe what it does and what the return value is?
21.3 I think we need to hold the controller lock when calling maybeRemoveReplicaFromIsr.
21.4 We should send stopReplica requests to all partitions on this broker, not just partitions who are followers, right?


                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-340) ensure ISR has enough brokers during clean shutdown of a broker

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13437978#comment-13437978 ] 

Jun Rao commented on KAFKA-340:
-------------------------------

For this to happen, each broker needs to know the # of replicas in a partition. We can change the leaderAndISRRequest to pass this information from the controller to the broker.
                
> ensure ISR has enough brokers during clean shutdown of a broker
> ---------------------------------------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>              Labels: optimization
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13483755#comment-13483755 ] 

Joel Koshy commented on KAFKA-340:
----------------------------------

21.2: sure will make those changes.

21.4: We don't send stopReplicaRequests for partitions whose leader is *still* on the broker to be shut down - we could, but it probably doesn't matter and the existing behavior is a side-effect of the if condition that needs to be there. Say, there are two partitions ("a", "b") led by the broker, and "c" is some other partition that the broker is following. Suppose leadership of "a" is successfully moved to another broker, but leadership of "b" is not. In this case, the broker will get StopReplica requests for "a" and "c" but not "b" because of the "if (controllerContext.allLeaders(topicAndPartition) != id) {" to prevent removing it from the ISR which does not make sense because:
(i) The partition/replica is still online (since leadership was not moved).
(ii) In my comment for v2. I mentioned the possibility of a shutting down broker remaining in ISR. So suppose broker 0 was shutdown, the leader is now 1, and ISR is still 0,1. Now shutdown broker 1, then leader election will fail (because 0 is shutting down) - in which case I should not remove 1 from the ISR. (Without that guard we would end up with leader: 1, isr: 0 which does not make sense.

                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, KAFKA-340-v3.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-340) ensure ISR has enough brokers during clean shutdown of a broker

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao updated KAFKA-340:
--------------------------

    Labels: features  (was: optimization)
    
> ensure ISR has enough brokers during clean shutdown of a broker
> ---------------------------------------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>              Labels: features
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Assigned] (KAFKA-340) ensure ISR has enough brokers during clean shutdown of a broker

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy reassigned KAFKA-340:
--------------------------------

    Assignee: Joel Koshy
    
> ensure ISR has enough brokers during clean shutdown of a broker
> ---------------------------------------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>              Labels: bugs
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-340:
-----------------------------

    Attachment: KAFKA-340-v2.patch

Responses inlined. The other comments are addressed in the patch.

Neha's comments:

> 1. KafkaController
> 1.1 It seems like we might need to move the filter to the getter instead of the setter for liveBrokerIds. This is
>     because if the shutting down broker list changes after setting the value for live brokers and reading it, the list
>     of live brokers might either miss out alive brokers or include dead brokers.

Good catch - fixed this. I left the filter on the setter as well, although it isn't strictly needed. Also added a
liveOrShuttingDownBrokerIds method as leaderAndIsr requests would otherwise only be sent to live brokers.

> 1.2 Since both alive and shutting down broker lists are set, you can use the set difference notation instead of
>     individually reading and filtering out unwanted brokers. Not sure how sets are implemented under the covers in
>     Scala, but my guess is that the set difference method is more efficient.

I would have, but the sets are of different underlying types (Broker vs. broker-id (int))

> 1.3 Can we rename replicatedPartitionsBrokerLeads to replicatePartitionsThisBrokerLeads ?

The main reason I didn't name it with "this" is that it could mean either the shutting-down broker or the controller
where this code executes. Ideally it should be called say, replicatedPartitionsShuttingDownBrokerLeads which seemed
too verbose. Would partitionsToMove be better?

> 1.4 The leader movement sets the isr correctly for the partition by removing the current leader as well as any broker
>     that is shutting down from the isr. Then, the replica is marked as offline which tries to remove the broker from
>     the isr again. Let's ignore the fact that the OfflineReplica state change reads the isr from zookeeper since we are
>     going to address that in a separate JIRA already and I don't think we should optimize too early here. What we can
>     do is add a check to OfflineReplica state change that doesn't do the zookeeper write and rpc request if the
>     replica is already not in the isr.

That sounds good - made that change (in a helper function since I needed similar logic in the shutdown procedure).

> 1.6 If the shutdownBroker API throws a runtime exception, what value is returned to the admin command ?

Throws an exception - i.e. no return.

> 1.7 In shutdownBroker, the controllerLock is acquired and released atleast four times. Probably you wanted to release
>     the lock between the move for each partition in order to not block on an unimportant operation like broker
>     shutdown. It is usually wise to release a lock for blocking operations to avoid deadlock/starvation. So, I didn't
>     get why the lock is released for sending the stop replica request. It seems like the lock can be acquired up until
>     the list of partitions to be moved is computed ? This acquire/release lock multiple times approach is always
>     tricky, but right now after reading the code once, I'm not sure if this particular API would run into any race
>     condition or not. So far, my intuition is that the only bad thing that can happen is we miss out on moving some
>     leaders from the broker, which is not a critical operation anyway.

I could refactor this a bit, but not sure it saves much. It is definitely subtle - i.e., potential interference with
other operations (e.g., partition reassignment) when the lock is released. I did a pass of all usages of
liveBrokerIds, liveBrokers and I think it's fine. If a partition is reassigned before the leadership moves that is
also fine as there is a check (if (currLeader == id)) before attempting moving leadership.

> 2. StopReplicaRequest
> 2.1 I think it is a little awkward to overload the stop replica request with no partitions to mean that it is not
>     supposed to shutdown the replica fetcher thread. In the future, if we find a use case where we need to do this
>     only for some partitions, we might need to change the stop replica request format. What do people think about
>     adding a flag to the stop replica request that tells the broker if it should just shutdown the replica fetcher
>     thread or delete the replica as well ?

Not very sure if this is similar to Jun's comment. Instead of explicitly specifying all the partitions, not
specifying anything in the partition set meant "all partitions" (although I removed this in v2). I agree that it is
helpful to introduce a flag. Also see response to Jun's comment. Also, fixed a bug in addStopReplicaRequestForBrokers
in the controller channel manager - basically the immutable set value wasn't getting updated.

Jun's comments

> 10. KafkaController.shutdownBroker:
> 10.1 Just stopping the follower in the broker to be shut down doesn't make it faster to fall out of leader's isr.
>      This is because the leader will still need to wait for the timeout before dropping the broker out of the isr.
>      The controller will need to shrink isr and send a leaderAndIsr request to each of the leaders.
>      If we do this, there is probably no need for the wildcard stopReplica request.

I think it helps to some degree - i.e., otherwise the replica on the shutting down broker will remain in ISR for the
partitions it is following. Pre-emptively stopping the fetchers will encourage it to fall out (yes after the timeout)
but the shutdown operation itself could take some time.

I like your suggestion of actively shrinking the ISR - made that change. I think we still need to do a
stopReplicaFetcher in order to stop the fetchers (or it will re-enter ISR soon enough). Also, we need to send it a
leaderAndIsr request for the partitions being moved (to tell it that it is no longer the leader). That would cause it
to start up replica fetchers to the new leaders. So what I ended up doing is: after all the partition movements are
complete, send a stop replica request, and update leaderAndIsr for all partitions present on the shutting-down broker
that removes it from isr.

Somehow, I still think it is better to use StopReplica to force a "full shutdown" of the replica fetcher on the
shutting down broker. Right now it is possible for a fetch request to be sent from the broker to the (new) leader
at the same time the leaderAndIsr is shrunk to exclude the broker. The leader could then re-expand the ISR.

> 11.2 In the leader election logic, there is no need to make sure that the new leader is not the current leader. The
>      customized controllerContext.liveBrokerIds should have filtered out the current leader (which is shut down by the
>      jmx operation).

Actually, I needed to change this to include shutting down brokers (since the leaderAndIsr request also needs to
be sent to the shutting down broker) so I still need to do check.

> 12. StopReplicaRequest: Agree with Neha here. We need to add a flag to distinguish between the case that we just want
>     to stop the replica and the case that we want to stop the replica and delete its data. The latter will be used
>     in reassigning partitions (and delete topics in the future).

Done. I went with a "global" delete partitions flag (as opposed to per-partition) in the request format. We can
discuss whether we should make this a per-partition flag.

                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13479056#comment-13479056 ] 

Jun Rao commented on KAFKA-340:
-------------------------------

Thanks for the patch. Looks good overall. Some comments:

10. KafkaController.shutdownBroker:
10.1 Just stopping the follower in the broker to be shut down doesn't make it faster to fall out of leader's isr. This is because the leader will still need to wait for the timeout before dropping the broker out of the isr. The controller will need to shrink isr and send a leaderAndIsr request to each of the leaders. If we do this, there is probably no need for the wildcard stopReplica request.
10.2 It's better to use partitionsToMove in the following statement. 
    debug("Partitions with replication factor > 1 for which broker %d is leader: %s"
          .format(id, replicatedPartitionsBrokerLeads.mkString(",")))

11. IsrPartitionLeaderSelector:
11.1 The name seems very general. Could we rename it to something like controlledShutdownLeaderElector?
11.2 In the leader election logic, there is no need to make sure that the new leader is not the current leader. The customized controllerContext.liveBrokerIds should have filtered out the current leader (which is shut down by the jmx operation).

12. StopReplicaRequest: Agree with Neha here. We need to add a flag to distinguish between the case that we just want to stop the replica and the case that we want to stop the replica and delete its data. The latter will be used in reassigning partitions (and delete topics in the future).


                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-340) ensure ISR has enough brokers during clean shutdown of a broker

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-340:
-----------------------------

    Priority: Blocker  (was: Major)
    
> ensure ISR has enough brokers during clean shutdown of a broker
> ---------------------------------------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-340:
-----------------------------

    Summary: Implement clean shutdown in 0.8  (was: ensure ISR has enough brokers during clean shutdown of a broker)
    
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13483894#comment-13483894 ] 

Jun Rao commented on KAFKA-340:
-------------------------------

21.4 I looked again. Yes, your code is correct.

One more minor comment.
22. ControllerBrokerRequestBatch.sendRequestsToBrokers(): In m.foreach, instead of using r, could we use case(brokerId, partitionsToBeStopped)? Then we can refer to those names directly, instead of r._1 and r._2.

Other than that, the patch looks good.
                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, KAFKA-340-v3.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-340:
-----------------------------

    Attachment: KAFKA-340-v1.patch

Short summary of this implementation of clean shutdown:

- Shutdown is triggered through a JMX operation on the controller.
- Steps during shutdown:
  - Record the broker as shutting down in controller context.
    - This set will contain all shutting brokers until they are actually
      taken down. The liveBroker set will mask these (through the custom
      getter/setter).
  - Send a "wildcard" StopReplica request to the broker to stop its replica
    fetchers. This will cause it to fall out of ISR sooner (as explained in
    the previous comment.)
  - Identify partitions led by the broker with replication factor > 1
  - Transition leadership to another broker in ISR
- Return the number of remaining partitions that are led by the broker.

In practice, the way you would do clean shutdown is:
- Use the admin tool: ./bin/kafka-run-class.sh kafka.admin.ShutdownBroker
  --broker <bid> --zookeeper <zkconnect>
- If the shutdown status that it prints out is "complete" then it means
  broker <bid> has stopped its replica fetchers, and does not lead any
  partitions. In this case, send a SIGTERM to the Kafka process to actually
  take down the broker.
- If the shutdown status that is prints is "incomplete" then you may want to
  wait a bit before retrying - which would typically make sense in a rolling
  bounce.
- If you are bringing down the entire cluster, you will eventually hit the
  "incomplete" status - since there will be insufficient brokers to move the
  partition leadership to. In this case the operator presumably knows the
  situation and will proceed to do an "unclean" shutdown on the remaining
  brokers.
- If the jmx operation itself fails (say due to a controller failover),
  simply retry.

Other comments:

- I initially thought to use boolean for handleStateChange, but needed to
  query for the actual moved partition counts so did away with that.
- Also, considered using a zkpath (instead of jmx), but did not do this
  because we would effectively lock the zkclient event thread until all
  partition leadership moves are attempted. In this implementation the
  controller context's lock is relinquished after moving each partition.
  Another benefit of jmx over the zkpath is that it is convenient to return
  the shutdown status so there's no need for a follow-up status check.
- For stopping the replica fetchers, I simply used a "wildcard" StopReplica
  request - i.e., without any partitions listed. The broker will not get any
  more leaderAndIsr requests (since it is no longer exposed under
  liveBrokers) so the fetchers will not restart.
- I added a slightly dumb unit test (in addition to local stand-alone
  testing), but we will need a more rigorous system test for this.
- Please let me know if you can think of corner cases to test for.

                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13478202#comment-13478202 ] 

Neha Narkhede commented on KAFKA-340:
-------------------------------------

Thanks for the patch, it looks very well thought out. A few questions/comments -

1. KafkaController
1.1 It seems like we might need to move the filter to the getter instead of the setter for liveBrokerIds. This is because if the shutting down broker list changes after setting the value for live brokers and reading it, the list of live brokers might either miss out alive brokers or include dead brokers.
1.2 Since both alive and shutting down broker lists are set, you can use the set difference notation instead of individually reading and filtering out unwanted brokers. Not sure how sets are implemented under the covers in Scala, but my guess is that the set difference method is more efficient.
1.3 Can we rename replicatedPartitionsBrokerLeads to replicatePartitionsThisBrokerLeads ?
1.4 The leader movement sets the isr correctly for the partition by removing the current leader as well as any broker that is shutting down from the isr. Then, the replica is marked as offline which tries to remove the broker from the isr again. Let's ignore the fact that the OfflineReplica state change reads the isr from zookeeper since we are going to address that in a separate JIRA already and I don't think we should optimize too early here. What we can do is add a check to OfflineReplica state change that doesn't do the zookeeper write and rpc request if the replica is already not in the isr.
1.5 In shutdownBroker,
      controllerContext.shuttingDownBrokerIds.add(id)
      controllerContext.liveBrokers = controllerContext.liveBrokers.filter(_.id != id)
It seems like the 2nd statement is not required since the shutting down brokers should be removed from the live brokers list anyways. This is another reason why moving it to the getter might be useful and safer.
1.6 If the shutdownBroker API throws a runtime exception, what value is returned to the admin command ?
1.7 In shutdownBroker, the controllerLock is acquired and released atleast four times. Probably you wanted to release the lock between the move for each partition in order to not block on an unimportant operation like broker shutdown. It is usually wise to release a lock for blocking operations to avoid deadlock/starvation. So, I didn't get why the lock is released for sending the stop replica request. It seems like the lock can be acquired up until the list of partitions to be moved is computed ? This acquire/release lock multiple times approach is always tricky, but right now after reading the code once, I'm not sure if this particular API would run into any race condition or not. So far, my intuition is that the only bad thing that can happen is we miss out on moving some leaders from the broker, which is not a critical operation anyway.

2. StopReplicaRequest

2.1 I think it is a little awkward to overload the stop replica request with no partitions to mean that it is not supposed to shutdown the replica fetcher thread. In the future, if we find a use case where we need to do this only for some partitions, we might need to change the stop replica request format. What do people think about adding a flag to the stop replica request that tells the broker if it should just shutdown the replica fetcher thread or delete the replica as well ?

                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Resolved] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Neha Narkhede resolved KAFKA-340.
---------------------------------

    Resolution: Fixed

Patch is committed.
                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, KAFKA-340-v3.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-340) ensure ISR has enough brokers during clean shutdown of a broker

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao updated KAFKA-340:
--------------------------

    Remaining Estimate: 168h
     Original Estimate: 168h
    
> ensure ISR has enough brokers during clean shutdown of a broker
> ---------------------------------------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>              Labels: bugs
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13483657#comment-13483657 ] 

Jun Rao commented on KAFKA-340:
-------------------------------

21.2 My confusion is that I thought replicatedPartitionsBrokerLeads is a val instead of a method. Could you change replicatedPartitionsBrokerLeads.toSet to replicatedPartitionsBrokerLeads().toSet to indicate that there is side effect? Also, could you add a comment to shutdownBroker() to describe what it does and what the return value is? 

21.3 Looked again. Yes, you are right. We don't need to lock there.

21.4 The code still doesn't send stopReplicaRequests for partitions whose leader is on the broker to be shut down. This is not truly needed since the leader won't issue any fetch request. However, it would be better to stop those replicas too.
                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, KAFKA-340-v3.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-340:
-----------------------------

    Attachment: KAFKA-340-v3.patch

Thanks for the reviews. I did a lot of stand-alone testing, which I will continue while you review this. There are
probably several corner cases and bounce sequences that are yet to be tested. For e.g., an (albeit non-critical) caveat
in not using zk to record the shutting down brokers is that on controller failover, the new controller will be unaware
of the fact. E.g., say there are two brokers (0, 1), one topic with replication factor 2. 0 is the leader and
controller. Shutdown command on broker 1. Shutdown command on broker 0 (which will give an incomplete status). At this
point only 0 is in leaderAndIsr. If you send a SIGTERM to broker 0, then broker 1 will become the leader again.

> 3.2 If removing the replica from isr is unnecessary, why do we still have to send the leader and isr request to the
>     leader ?

Fixed this. Also in v2, I had an additional requirement on removing a replica from ISR: if it is the only replica in
ISR, then it cannot be removed. i.e., you would then end up with a leaderAndIsr with a leader, but an empty ISR which
I felt does not really make sense. Anyway, I did not really need to do this in v3 - when shutting down a broker, I only
remove it from ISR if the partition leadership is not on the broker that is shutting down.

Working through this comment led me to wonder - when the last replica of a partition goes offline, the leaderAndIsr
path isn't updated - i.e., the zk path still says the leader is "x" although "x" is offline. It is non-critical though
since clients won't be able to communicate with it, but maybe leader election should enter an invalid broker id in such
cases.

> 3.4 What is the point of sending leader and isr request at the end of shutdownBroker, since the OfflineReplica state
> change would've taken care of that anyway. It seems like you just need to send the stop replica request with the delete
> partitions flag turned off, no ?

I still need (as an optimization) to send the leader and isr request to the leaders of all partitions that are present
on the shutting down broker so it can remove the shutting down broker from its inSyncReplicas cache
(in Partition.scala) so it no longer waits for acks from the shutting down broker if a producer request's num-acks is
set to -1. Otherwise, we have to wait for the leader to "organically" shrink the ISR.

This also applies to partitions which are moved (i.e., partitions for which the shutting down broker was the leader):
the ControlledShutdownLeaderSelector needs to send the updated leaderAndIsr request to the shutting down broker as well
(to tell it that it is no longer the leader) at which point it will start up a replica fetcher and re-enter the ISR.
So in fact, there is actually not much point in removing the "current leader" from the ISR in the
ControlledShutdownLeaderSelector.selectLeader.

Jun's comments

Thanks for patch v2. Some more comments and clarification:

> 20. ControllerContext.liveBrokers: We already filter out shutdown brokers in the getter. Do we still need to do the
> filtering at the setter?

It is not required but doesn't hurt. So I had left it there so the filter on the getter has less elements to deal with.
Although I don't think it matters at all to remove it from the setter - it's all in memory and we're dealing with very
small sets. Anyway, removed it from the setter in v3.

> 21. KafkaController.shutdownBroker():
> 21.1 We probably should synchronize this method to allow only 1 outstanding shutdown operation at a time. Various maps
> in ControllerContext are updated in this method and they are not concurrent.

I think it should be fine, since those map modifications are guarded by the controller context lock. That said, adding
a shutdown lock should make it safer so I added a brokerShutdownLock. I don't think anything in there can block
indefinitely (i.e., if there is, then we would need a way to cancel the operation).

> 21.2 My understanding is that this method returns the number of partitions whose leader is still on this broker. If so,
> shouldn't partitionsRemaining be updated after leaders are moved? Also, could you add a comment to this method to
> describe what it does and what the return value is?

Yes - it is updated. i.e., invoking the method will give the most current value. That is why I needed to call it again
at the end.

21.3 I think we need to hold the controller lock when calling maybeRemoveReplicaFromIsr.

The controller context is not being accessed here - or am I missing something?

21.4 We should send stopReplica requests to all partitions on this broker, not just partitions who are followers, right?

Correct - sorry, I don't fully recall the reason I had that filter in there. I think it might have been to avoid the
issue described in 3.2 of Neha's comments - it was possible to end up with leaderAndIsr that had leader=0, isr = 1,
which does not make sense. In any event, it is fixed now.

Other comments are addressed.

                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, KAFKA-340-v3.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-340) Implement clean shutdown in 0.8

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13484727#comment-13484727 ] 

Joel Koshy commented on KAFKA-340:
----------------------------------

Thanks for the review - committed to 0.8 with one more tweak: the bean should be registered only onControllerFailure and unregistered on session expiration.

Also, I'll file a separate follow-up jira as I think it would be better to use a call-back to ensure stop-replicas complete before shrinking ISR.

                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, KAFKA-340-v3.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-340) ensure ISR has enough brokers during clean shutdown of a broker

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13467380#comment-13467380 ] 

Joel Koshy commented on KAFKA-340:
----------------------------------

This may not be too difficult to implement, but tricky to use so I would
like to put down a more detailed description as that may help come up with
more subtleties and corner cases to account for. The actual invocation of
shutdown (describe at end) needs to be thought through.

The existing shutdown hook provides a conventional shutdown approach and is
convenient from an operations perspective in that it can be easily scripted
(just kill -s SIGTERM pid).  The problem with the existing shutdown logic which
this jira is meant to address is that it simply brings the broker down, which
takes partitions offline abruptly. This can lead to message loss during rolling
bounces - e.g., consider a rolling bounce of brokers B1, B2. If a producer is
configured with num-request-acks -1, and T1-P1 is on (B1, B2) and B2 is leader.
If B1 is bounced, it may fall out of the ISR for T1-P1. B2 continues to receive
messages to T1-P1 and commits them immediately. After B1 comes back up, if B2
is bounced, then B1 would become the new leader, but its log would be truncated
to the last HW which is smaller than the committed (and ack'd) offsets on B2.

The original proposal (to wait until there is at least one other broker in ISR
before shutting down) is also not without issues. It could lead to long waits
during clean shutdown. Furthermore, it involves moving all the leaders on the
broker that is being shutdown to other brokers. Partitions that are moved later
will experience longer outages than the partitions earlier in the list. In
addition to avoiding message loss, another goal in clean shutdown should be to
keep partitions as available as possible. Jun suggested another possible
approach: before shutting down a broker, pre-emptively relinquish and move the
leadership of partitions for which it is currently leader to another broker in
ISR. One benefit of this approach is that leader transition can be done
serially, one partition at a time. So each partition will only be unavailable
for the period it takes to establish a new leader.

The actual process to shut down broker B may look something like this. (This
assumes the command is issued to the current controller but can be adapted for
the other options outlined at the end.)

- For each partition T-P that B is leader for:
  - Move leadership to another broker that is in ISR. (So if |ISR| == 1
    - meaning only B - and replication factor >= 2 then we have to wait.)
  - Issue the leaderAndISR request to the updated list of brokers in
    leaderAndISR.

- For all partitions for which Bx is a follower:
  - Issue a "stopFetcher" for T-P. (Explicitly stopping the fetchers helps
    reduce the possibility of timeouts on producer requests, since not doing so
    would cause B to remain in ISR longer.)
  - Issue a leaderAndISR request to the updated list of brokers in
    leaderAndISR.

Here are a few options, for the actual clean-shutdown invocation. These are not
mutually exclusive, so we can pick one for now and implement the others later
if they make sense.

- Option 1: JMX-operation on the Kafka-controller to clean shut-down a
  specified broker. (If the controller fails during its execution, then the
  operation will simply fail.) This could also be a JMX-operation on individual
  brokers, but that would require individual brokers to forward the
  "relinquishLeadership" request to the controller.
- Option 2: The broker's shutdown handler sends a "relinquishLeadership"
  request to the controller and awaits a response from the controller. See note
  below on the effect of a failure in this step.
- Option 3: Expose a new command port to issue a "cleanShutdown brokerid"
  command to the controller. As above, this could be on individual brokers as
  well. The command port would also be useful for other commands such as "print
  broker config", "dump stats", etc.

The main disadvantage with Option 2 is that if the leader transition fails then
you cannot really retry - well, you could wait/retry until the transition is
successful, but then you would need to introduce a timeout. Introducing a
timeout has its own issues - e.g., what is a reasonable timeout; also you would
be forced to wait for the timeout during a full cluster shutdown. With Options
1/3 the operation would just fail. (And if you *know* that you are doing a full
cluster shutdown you can go ahead and use the SIGTERM-based shutdown.)

Typical scenarios:
- Rolling bounce. After a broker is bounced and comes back up it can take some
  time for it to return to ISR (if it fell out). In this case, the
  relinquishLeadership operation of the next broker in the bounce sequence may
  fail but would succeed after the preceding broker comes back up and returns
  to ISR.
- Full cluster shutdown. In this case, relinquishLeadership would likely fail
  for brokers that come later in the shutdown sequence. However, the user would
  know this is a full cluster shutdown and just use the usual SIGTERM-based
  shutdown.
- Replication factor of 1: similar to full cluster shutdown.

                
> ensure ISR has enough brokers during clean shutdown of a broker
> ---------------------------------------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only that broker, we could lose some messages that have been previously committed. For clean shutdown, we need to guarantee that there is at least 1 other broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira