You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Tom Bentley <t....@gmail.com> on 2017/09/05 16:39:41 UTC

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

I've revised this KIP again:

* Change the alterPartitionCounts() API to support passing an optional
assignment for the new partitions (which is already supported by
kafka-topics.sh). At the same time I didn't want the API to suggest it was
possible to change the existing assignments in the same call (which isn't
supported today).

* Removed alterReplicationFactor(), since it's really just a special case
of reassignPartitions(): In both cases it is necessary to give a complete
partition assignment and both can be long running due to data movement.


Outstanding questions:

1. Is the proposed alterInterReplicaThrottle() API really better than
changing the throttle via alterConfigs()? It wouldn't be necessary to
support alterConfigs() for all broker configs, just DynamicConfigs (which
can't be set via the config file any way). Would it be a problem that
triggering the reassignment required ClusterAction on the Cluster, but
throttling the assignment required Alter on the Topic? What if a user had
the former permission, but not the latter?

2. Is reassignPartitions() really the best name? I find the distinction
between reassigning and just assigning to be of limited value, and
"reassign" is misleading when the APIs now used for changing the
replication factor too. Since the API is asynchronous/long running, it
might be better to indicate that in the name some how. What about
startPartitionAssignment()?

I am, of course, interested in any other questions or comments people have.




On 30 August 2017 at 16:17, Tom Bentley <t....@gmail.com> wrote:

> Hi all,
>
> I've updated the KIP as follows:
>
> * remove the APIs supporting progress reporting in favour of the APIs
> being implemented in KIP-113.
> * added some APIs to cover the existing functionality around throttling
> inter-broker transfers, which was previously a TODO.
>
> To respond to Colin's suggestion:
>
> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> >                                   Map<String, TopicAlteration> alters)
> >
> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> >
> > ReplicationFactorAlteration(int repl) implements TopicAlteration
> >
> > ReassignPartitionsAlteration(...) implements TopicAlteration
>
> That is a workable alternative to providing 3 separate methods on the
> AdminClient, but I don't see why it is objectively better. I don't see
> clients commonly needing to do a mixture of alterations, and it assumes
> that the options make sense for all alterations.
>
> Cheers,
>
> Tom
>
> On 22 August 2017 at 23:49, Colin McCabe <cm...@apache.org> wrote:
>
>> On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
>> > Hi Dong and Jun,
>> >
>> > Thanks again for your input in this discussion and on KIP-113. It's
>> > difficult that discussion is split between this thread and the one for
>> > KIP-113, but I'll try to respond on this thread to questions asked on
>> > this
>> > thread.
>> >
>> > It seems there is some consensus that the alterTopic() API is the wrong
>> > thing, and it would make more sense to separate the different kinds of
>> > alteration into separate APIs. It seems to me there is then a choice.
>> >
>> > 1. Have separate alterPartitionCount(), alterReplicationFactor() and
>> > reassignPartitions() methods. This would most closely match the
>> > facilities
>> > currently offered by kafka-alter-topics and kafka-reassign-partitions.
>> > 2. Just provide a reassignPartitions() method and infer from the shape
>> of
>> > the data passed to that that a change in replication factor and/or
>> > partition count is required, as suggested by Dong.
>> >
>> > The choice we make here is also relevant to KIP-113 and KIP-178. By
>> > choosing (1) we can change the replication factor or partition count
>> > without providing an assignment and therefore are necessarily requiring
>> > the
>> > controller to make a decision for us about which broker (and, for 113,
>> > which log directory and thus which disk) the replica should be reside.
>> > There is then the matter of what criteria the controller should use to
>> > make
>> > that decision (a decision is also required on topic creation of course,
>> > but
>> > I'll try not to go there right now).
>>
>> Hmm.  One approach would be to have something like
>>
>> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
>> >                                   Map<String, TopicAlteration> alters)
>> >
>> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
>> >
>> > ReplicationFactorAlteration(int repl) implements TopicAlteration
>> >
>> > ReassignPartitionsAlteration(...) implements TopicAlteration
>>
>>
>> >
>> > Choosing (2), on the other hand, forces us to make an assignment, and
>> > currently the AdminClient doesn't provide the APIs necessary to make a
>> > very
>> > informed decision. To do the job properly we'd need APIs to enumerate
>> the
>> > permissible log directories on each broker, know the current disk usage
>> > etc. These just don't exist today, and I think it would be a lot of work
>> > to
>> > bite off to specify them. We've already got three KIPs on the go.
>> >
>> > So, for the moment, I think we have to choose 1, for pragmatic reasons.
>> > There is nothing to stop us deprecating alterPartitionCount() and
>> > alterReplicationFactor() and moving to (2) at a later date.
>>
>> Yeah, the API should be pragmatic.  An API that doesn't let us do what
>> we want to do (like let Kafka assign us a new replica on the least
>> loaded node) is not good.
>>
>> >
>> > To answer a specific questions of Jun's:
>> >
>> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
>> > > means.
>> >
>> >
>> > It corresponds to the ReplicaStatus.statusTime, and the idea was it was
>> > the
>> > epoch offset at which the controllers notion of the lag was current
>> > (which
>> > I think is the epoch offset of the last FetchRequest from the follower).
>> > At
>> > one point I was considering some other ways of determining progress and
>> > completion, and TBH I think it was more pertinent to those rejected
>> > alternatives.
>> >
>> > I've already made some changes to KIP-179. As suggested in the thread
>> for
>> > KIP-113, I will be changing some more since I think we can
>> > DescribeDirsRequest
>> > instead of ReplicaStatusResponse to implement the --progress option.
>>
>> That makes sense.  Whatever response is used, it would be nice to have
>> an error message in addition to an error code.
>>
>> best,
>> Colin
>>
>> >
>> > Cheers,
>> >
>> > Tom
>> >
>> >
>> >
>> > On 9 August 2017 at 02:28, Jun Rao <ju...@confluent.io> wrote:
>> >
>> > > Hi, Tom,
>> > >
>> > > Thanks for the KIP. A few minor comments below.
>> > >
>> > > 1. Implementation wise, the broker handles adding partitions
>> differently
>> > > from changing replica assignment. For the former, we directly update
>> the
>> > > topic path in ZK with the new partitions. For the latter, we write
>> the new
>> > > partition reassignment in the partition reassignment path. Changing
>> the
>> > > replication factor is handled in the same way as changing replica
>> > > assignment. So, it would be useful to document how the broker handles
>> these
>> > > different cases accordingly. I think it's simpler to just allow one
>> change
>> > > (partition, replication factor, or replica assignment) in a request.
>> > >
>> > > 2. Currently, we only allow adding partitions. We probably want to
>> document
>> > > the restriction in the api.
>> > >
>> > > 3. It's not very clear to me what status_time in ReplicaStatusResponse
>> > > means.
>> > >
>> > > Jun
>> > >
>> > >
>> > >
>> > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <li...@gmail.com>
>> wrote:
>> > >
>> > > > Hey Tom,
>> > > >
>> > > > Thanks for your reply. Here are my thoughts:
>> > > >
>> > > > 1) I think the DescribeDirsResponse can be used by AdminClient to
>> query
>> > > the
>> > > > lag of follower replica as well. Here is how it works:
>> > > >
>> > > > - AdminClient sends DescribeDirsRequest to both the leader and the
>> > > follower
>> > > > of the partition.
>> > > > - DescribeDirsResponse from both leader and follower shows the LEO
>> of the
>> > > > leader replica and the follower replica.
>> > > > - Lag of the follower replica is the difference in the LEO between
>> leader
>> > > > and follower.
>> > > >
>> > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to
>> be
>> > > send
>> > > > to each replica of the partition whereas ReplicaStatusRequest only
>> needs
>> > > to
>> > > > be sent to the leader of the partition. It doesn't seem to make much
>> > > > difference though. In practice we probably want to query the
>> replica lag
>> > > of
>> > > > many partitions and AminClient needs to send exactly one request to
>> each
>> > > > broker with either solution. Does this make sense?
>> > > >
>> > > >
>> > > > 2) KIP-179 proposes to add the following AdminClient API:
>> > > >
>> > > > alterTopics(Collection<AlteredTopic> alteredTopics,
>> AlterTopicsOptions
>> > > > options)
>> > > >
>> > > > Where AlteredTopic includes the following fields
>> > > >
>> > > > AlteredTopic(String name, int numPartitions, int replicationFactor,
>> > > > Map<Integer,List<Integer>> replicasAssignment)
>> > > >
>> > > > I have two comments on this:
>> > > >
>> > > > - The information in AlteredTopic seems a bit redundant. Both
>> > > numPartitions
>> > > > and replicationFactor can be derived from replicasAssignment. I
>> think we
>> > > > can probably just use the following API in AdminClient instead:
>> > > >
>> > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
>> > > > partitionAssignment, AlterTopicsOptions options)
>> > > >
>> > > > - Do you think "reassignPartitions" may be a better name than
>> > > > "alterTopics"? This is more consistent with the existing name used
>> in
>> > > > kafka-reassign-partitions.sh and I also find it to be more
>> accurate. On
>> > > the
>> > > > other hand, alterTopics seems to suggest that we can also alter
>> topic
>> > > > config.
>> > > >
>> > > >
>> > > >
>> > > > Thanks,
>> > > > Dong
>> > > >
>> > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t....@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi Dong,
>> > > > >
>> > > > > Thanks for your reply.
>> > > > >
>> > > > > You're right that your DescribeDirsResponse contains appropriate
>> data.
>> > > > The
>> > > > > comment about the log_end_offset in the KIP says "Enable user to
>> track
>> > > > > movement progress by comparing LEO of the *.log and *.move". That
>> makes
>> > > > me
>> > > > > wonder whether this would only work for replica movement on the
>> same
>> > > > > broker. In the case of reassigning partitions between brokers
>> it's only
>> > > > > really the leader for the partition that knows the max LEO of the
>> ISR
>> > > and
>> > > > > the LEO of the target broker. Maybe the comment is misleading and
>> it
>> > > > would
>> > > > > be the leader doing the same thing in both cases. Can you clarify
>> this?
>> > > >
>> > > >
>> > > > > At a conceptual level there's a lot of similarity between KIP-113
>> and
>> > > > > KIP-179 because they're both about moving replicas around.
>> Concretely
>> > > > > though, ChangeReplicaDirRequest assumes the movement is on the
>> same
>> > > > broker,
>> > > > > while the equivalent APIs in KIP-179 (whichever alternative) lack
>> the
>> > > > > notion of disks. I wonder if a unified API would be better or not.
>> > > > > Operationally, the reasons for changing replicas between brokers
>> are
>> > > > likely
>> > > > > to be motivated by balancing load across the cluster, or
>> > > adding/removing
>> > > > > brokers. But the JBOD use case has different motivations. So I
>> suspect
>> > > > it's
>> > > > > OK that the APIs are separate, but I'd love to hear what others
>> think.
>> > > >
>> > > >
>> > > > > I think you're right about TopicPartitionReplica. At the protocol
>> level
>> > > > we
>> > > > > could group topics and partitions together so avoid having the
>> same
>> > > topic
>> > > > > name multiple times when querying for the status of all the
>> partitions
>> > > > of a
>> > > > > topic.
>> > > > >
>> > > > > Thanks again for taking the time to respond.
>> > > > >
>> > > > > Tom
>> > > > >
>> > > > > On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com> wrote:
>> > > > >
>> > > > > > Hey Tom,
>> > > > > >
>> > > > > > Thanks for the KIP. It seems that the prior discussion in this
>> thread
>> > > > has
>> > > > > > focused on reassigning partitions (or AlterTopics). I haven't
>> looked
>> > > > into
>> > > > > > this yet. I have two comments on the replicaStatus() API and the
>> > > > > > ReplicaStatusRequest:
>> > > > > >
>> > > > > > -  It seems that the use-case for ReplicaStatusRequest is
>> covered by
>> > > > > > the DescribeDirsRequest introduced in KIP-113
>> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > 113%3A+Support+replicas+movement+between+log+directories>.
>> > > > > > DescribeDirsResponse contains the logEndOffset for the specified
>> > > > > > partitions. The the lag of the follower replica can be derived
>> as the
>> > > > > > difference between leader's LEO and follower's LEO. Do you
>> think we
>> > > can
>> > > > > > simply use DescribeDirsRequest without introducing
>> > > > ReplicaStatusRequest?
>> > > > > >
>> > > > > > - According to the current KIP, replicaStatus() takes a list of
>> > > > > partitions
>> > > > > > as input. And its output maps the partition to a list of replica
>> > > > status.
>> > > > > Do
>> > > > > > you think it would be better to have a new class called
>> > > > > > TopicPartitionReplica, which identifies the topic, partition
>> and the
>> > > > > > brokerId. replicaStatus can then take a list of
>> TopicPartitionReplica
>> > > > as
>> > > > > > input. And its output maps the replica to replica status. The
>> latter
>> > > > API
>> > > > > > seems simpler and also matches the method name better. What do
>> you
>> > > > think?
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Dong
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <
>> t.j.bentley@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi again Ismael,
>> > > > > > >
>> > > > > > >
>> > > > > > > 1. It's worth emphasising that reassigning partitions is a
>> > > different
>> > > > > > > >> process than what happens when a topic is created, so not
>> sure
>> > > > > trying
>> > > > > > to
>> > > > > > > >> make it symmetric is beneficial. In addition to what was
>> already
>> > > > > > > >> discussed,
>> > > > > > > >> one should also enable replication throttling before
>> moving the
>> > > > > data.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Thanks for your responses.The only advantage I can see from
>> > > having
>> > > > > > > > separate APIs for partition count change vs. the other
>> changes is
>> > > > > that
>> > > > > > we
>> > > > > > > > expect the former to return synchronously, and the latter to
>> > > > > (usually)
>> > > > > > > > return asynchronously. Lumping them into the same API forces
>> > > > clients
>> > > > > of
>> > > > > > > > that API to code for the two possible cases. Is this the
>> > > particular
>> > > > > > > concern
>> > > > > > > > you had in mind, or do you have others?
>> > > > > > > >
>> > > > > > >
>> > > > > > > Just to help see what this looks like I've created another
>> version
>> > > of
>> > > > > > > KIP-179 (
>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
>> > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+
>> AdminClient
>> > > > > > > ).
>> > > > > > >
>> > > > > > > The AdminClient APIs are more type safe (harder to misuse).
>> The
>> > > > > > > asynchronous nature of the alterReplicationFactors() and
>> > > > > > > reassignPartitions() methods aren't really apparent (you'd
>> still
>> > > have
>> > > > > to
>> > > > > > > read the javadocs to know you had to call replicaStatus() to
>> > > > determine
>> > > > > > > completion), but it's still better that the lumped-together
>> API
>> > > > because
>> > > > > > the
>> > > > > > > methods are _consistently_ async. The Protocol APIs are
>> slightly
>> > > > nicer
>> > > > > > too,
>> > > > > > > in that INVALID_REQUEST is less commonly returned. Another
>> benefit
>> > > is
>> > > > > it
>> > > > > > > would allow for these different alteration APIs to be modified
>> > > > > > > independently in the future, should that necessary.
>> > > > > > >
>> > > > > > > What do others think?
>> > > > > > >
>> > > > > > > You're right that the proposal doesn't cover setting and
>> clearing a
>> > > > > > > > throttle, and it should. I will study the code about how
>> this
>> > > works
>> > > > > > > before
>> > > > > > > > posting back about that.
>> > > > > > > >
>> > > > > > >
>> > > > > > > AFAICS from the code the throttle is simply a dynamic config
>> of the
>> > > > > > broker.
>> > > > > > > We already have the AdminClient.alterConfigs API for setting
>> this,
>> > > so
>> > > > > the
>> > > > > > > refactoring of kafka-reassign-partitions.sh could just use
>> that
>> > > > > existing
>> > > > > > > API. We could still achieve your objective of setting the
>> throttle
>> > > > > before
>> > > > > > > reassignment by making the --throttle option mandatory when
>> > > --execute
>> > > > > was
>> > > > > > > present. Or did you have something else in mind?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > >
>> > > > > > > Tom
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>>
>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Ted Yu <yu...@gmail.com>.
bq. What about startPartitionAssignment() ?

Make sense.
startPartitionReassignment() seems to be better since the API deals with
reassignment.

Cheers

On Tue, Sep 5, 2017 at 9:39 AM, Tom Bentley <t....@gmail.com> wrote:

> I've revised this KIP again:
>
> * Change the alterPartitionCounts() API to support passing an optional
> assignment for the new partitions (which is already supported by
> kafka-topics.sh). At the same time I didn't want the API to suggest it was
> possible to change the existing assignments in the same call (which isn't
> supported today).
>
> * Removed alterReplicationFactor(), since it's really just a special case
> of reassignPartitions(): In both cases it is necessary to give a complete
> partition assignment and both can be long running due to data movement.
>
>
> Outstanding questions:
>
> 1. Is the proposed alterInterReplicaThrottle() API really better than
> changing the throttle via alterConfigs()? It wouldn't be necessary to
> support alterConfigs() for all broker configs, just DynamicConfigs (which
> can't be set via the config file any way). Would it be a problem that
> triggering the reassignment required ClusterAction on the Cluster, but
> throttling the assignment required Alter on the Topic? What if a user had
> the former permission, but not the latter?
>
> 2. Is reassignPartitions() really the best name? I find the distinction
> between reassigning and just assigning to be of limited value, and
> "reassign" is misleading when the APIs now used for changing the
> replication factor too. Since the API is asynchronous/long running, it
> might be better to indicate that in the name some how. What about
> startPartitionAssignment()?
>
> I am, of course, interested in any other questions or comments people have.
>
>
>
>
> On 30 August 2017 at 16:17, Tom Bentley <t....@gmail.com> wrote:
>
> > Hi all,
> >
> > I've updated the KIP as follows:
> >
> > * remove the APIs supporting progress reporting in favour of the APIs
> > being implemented in KIP-113.
> > * added some APIs to cover the existing functionality around throttling
> > inter-broker transfers, which was previously a TODO.
> >
> > To respond to Colin's suggestion:
> >
> > > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> > >                                   Map<String, TopicAlteration> alters)
> > >
> > > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> > >
> > > ReplicationFactorAlteration(int repl) implements TopicAlteration
> > >
> > > ReassignPartitionsAlteration(...) implements TopicAlteration
> >
> > That is a workable alternative to providing 3 separate methods on the
> > AdminClient, but I don't see why it is objectively better. I don't see
> > clients commonly needing to do a mixture of alterations, and it assumes
> > that the options make sense for all alterations.
> >
> > Cheers,
> >
> > Tom
> >
> > On 22 August 2017 at 23:49, Colin McCabe <cm...@apache.org> wrote:
> >
> >> On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
> >> > Hi Dong and Jun,
> >> >
> >> > Thanks again for your input in this discussion and on KIP-113. It's
> >> > difficult that discussion is split between this thread and the one for
> >> > KIP-113, but I'll try to respond on this thread to questions asked on
> >> > this
> >> > thread.
> >> >
> >> > It seems there is some consensus that the alterTopic() API is the
> wrong
> >> > thing, and it would make more sense to separate the different kinds of
> >> > alteration into separate APIs. It seems to me there is then a choice.
> >> >
> >> > 1. Have separate alterPartitionCount(), alterReplicationFactor() and
> >> > reassignPartitions() methods. This would most closely match the
> >> > facilities
> >> > currently offered by kafka-alter-topics and kafka-reassign-partitions.
> >> > 2. Just provide a reassignPartitions() method and infer from the shape
> >> of
> >> > the data passed to that that a change in replication factor and/or
> >> > partition count is required, as suggested by Dong.
> >> >
> >> > The choice we make here is also relevant to KIP-113 and KIP-178. By
> >> > choosing (1) we can change the replication factor or partition count
> >> > without providing an assignment and therefore are necessarily
> requiring
> >> > the
> >> > controller to make a decision for us about which broker (and, for 113,
> >> > which log directory and thus which disk) the replica should be reside.
> >> > There is then the matter of what criteria the controller should use to
> >> > make
> >> > that decision (a decision is also required on topic creation of
> course,
> >> > but
> >> > I'll try not to go there right now).
> >>
> >> Hmm.  One approach would be to have something like
> >>
> >> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> >> >                                   Map<String, TopicAlteration> alters)
> >> >
> >> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> >> >
> >> > ReplicationFactorAlteration(int repl) implements TopicAlteration
> >> >
> >> > ReassignPartitionsAlteration(...) implements TopicAlteration
> >>
> >>
> >> >
> >> > Choosing (2), on the other hand, forces us to make an assignment, and
> >> > currently the AdminClient doesn't provide the APIs necessary to make a
> >> > very
> >> > informed decision. To do the job properly we'd need APIs to enumerate
> >> the
> >> > permissible log directories on each broker, know the current disk
> usage
> >> > etc. These just don't exist today, and I think it would be a lot of
> work
> >> > to
> >> > bite off to specify them. We've already got three KIPs on the go.
> >> >
> >> > So, for the moment, I think we have to choose 1, for pragmatic
> reasons.
> >> > There is nothing to stop us deprecating alterPartitionCount() and
> >> > alterReplicationFactor() and moving to (2) at a later date.
> >>
> >> Yeah, the API should be pragmatic.  An API that doesn't let us do what
> >> we want to do (like let Kafka assign us a new replica on the least
> >> loaded node) is not good.
> >>
> >> >
> >> > To answer a specific questions of Jun's:
> >> >
> >> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> >> > > means.
> >> >
> >> >
> >> > It corresponds to the ReplicaStatus.statusTime, and the idea was it
> was
> >> > the
> >> > epoch offset at which the controllers notion of the lag was current
> >> > (which
> >> > I think is the epoch offset of the last FetchRequest from the
> follower).
> >> > At
> >> > one point I was considering some other ways of determining progress
> and
> >> > completion, and TBH I think it was more pertinent to those rejected
> >> > alternatives.
> >> >
> >> > I've already made some changes to KIP-179. As suggested in the thread
> >> for
> >> > KIP-113, I will be changing some more since I think we can
> >> > DescribeDirsRequest
> >> > instead of ReplicaStatusResponse to implement the --progress option.
> >>
> >> That makes sense.  Whatever response is used, it would be nice to have
> >> an error message in addition to an error code.
> >>
> >> best,
> >> Colin
> >>
> >> >
> >> > Cheers,
> >> >
> >> > Tom
> >> >
> >> >
> >> >
> >> > On 9 August 2017 at 02:28, Jun Rao <ju...@confluent.io> wrote:
> >> >
> >> > > Hi, Tom,
> >> > >
> >> > > Thanks for the KIP. A few minor comments below.
> >> > >
> >> > > 1. Implementation wise, the broker handles adding partitions
> >> differently
> >> > > from changing replica assignment. For the former, we directly update
> >> the
> >> > > topic path in ZK with the new partitions. For the latter, we write
> >> the new
> >> > > partition reassignment in the partition reassignment path. Changing
> >> the
> >> > > replication factor is handled in the same way as changing replica
> >> > > assignment. So, it would be useful to document how the broker
> handles
> >> these
> >> > > different cases accordingly. I think it's simpler to just allow one
> >> change
> >> > > (partition, replication factor, or replica assignment) in a request.
> >> > >
> >> > > 2. Currently, we only allow adding partitions. We probably want to
> >> document
> >> > > the restriction in the api.
> >> > >
> >> > > 3. It's not very clear to me what status_time in
> ReplicaStatusResponse
> >> > > means.
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > >
> >> > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <li...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hey Tom,
> >> > > >
> >> > > > Thanks for your reply. Here are my thoughts:
> >> > > >
> >> > > > 1) I think the DescribeDirsResponse can be used by AdminClient to
> >> query
> >> > > the
> >> > > > lag of follower replica as well. Here is how it works:
> >> > > >
> >> > > > - AdminClient sends DescribeDirsRequest to both the leader and the
> >> > > follower
> >> > > > of the partition.
> >> > > > - DescribeDirsResponse from both leader and follower shows the LEO
> >> of the
> >> > > > leader replica and the follower replica.
> >> > > > - Lag of the follower replica is the difference in the LEO between
> >> leader
> >> > > > and follower.
> >> > > >
> >> > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs
> to
> >> be
> >> > > send
> >> > > > to each replica of the partition whereas ReplicaStatusRequest only
> >> needs
> >> > > to
> >> > > > be sent to the leader of the partition. It doesn't seem to make
> much
> >> > > > difference though. In practice we probably want to query the
> >> replica lag
> >> > > of
> >> > > > many partitions and AminClient needs to send exactly one request
> to
> >> each
> >> > > > broker with either solution. Does this make sense?
> >> > > >
> >> > > >
> >> > > > 2) KIP-179 proposes to add the following AdminClient API:
> >> > > >
> >> > > > alterTopics(Collection<AlteredTopic> alteredTopics,
> >> AlterTopicsOptions
> >> > > > options)
> >> > > >
> >> > > > Where AlteredTopic includes the following fields
> >> > > >
> >> > > > AlteredTopic(String name, int numPartitions, int
> replicationFactor,
> >> > > > Map<Integer,List<Integer>> replicasAssignment)
> >> > > >
> >> > > > I have two comments on this:
> >> > > >
> >> > > > - The information in AlteredTopic seems a bit redundant. Both
> >> > > numPartitions
> >> > > > and replicationFactor can be derived from replicasAssignment. I
> >> think we
> >> > > > can probably just use the following API in AdminClient instead:
> >> > > >
> >> > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> >> > > > partitionAssignment, AlterTopicsOptions options)
> >> > > >
> >> > > > - Do you think "reassignPartitions" may be a better name than
> >> > > > "alterTopics"? This is more consistent with the existing name used
> >> in
> >> > > > kafka-reassign-partitions.sh and I also find it to be more
> >> accurate. On
> >> > > the
> >> > > > other hand, alterTopics seems to suggest that we can also alter
> >> topic
> >> > > > config.
> >> > > >
> >> > > >
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <
> t.j.bentley@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi Dong,
> >> > > > >
> >> > > > > Thanks for your reply.
> >> > > > >
> >> > > > > You're right that your DescribeDirsResponse contains appropriate
> >> data.
> >> > > > The
> >> > > > > comment about the log_end_offset in the KIP says "Enable user to
> >> track
> >> > > > > movement progress by comparing LEO of the *.log and *.move".
> That
> >> makes
> >> > > > me
> >> > > > > wonder whether this would only work for replica movement on the
> >> same
> >> > > > > broker. In the case of reassigning partitions between brokers
> >> it's only
> >> > > > > really the leader for the partition that knows the max LEO of
> the
> >> ISR
> >> > > and
> >> > > > > the LEO of the target broker. Maybe the comment is misleading
> and
> >> it
> >> > > > would
> >> > > > > be the leader doing the same thing in both cases. Can you
> clarify
> >> this?
> >> > > >
> >> > > >
> >> > > > > At a conceptual level there's a lot of similarity between
> KIP-113
> >> and
> >> > > > > KIP-179 because they're both about moving replicas around.
> >> Concretely
> >> > > > > though, ChangeReplicaDirRequest assumes the movement is on the
> >> same
> >> > > > broker,
> >> > > > > while the equivalent APIs in KIP-179 (whichever alternative)
> lack
> >> the
> >> > > > > notion of disks. I wonder if a unified API would be better or
> not.
> >> > > > > Operationally, the reasons for changing replicas between brokers
> >> are
> >> > > > likely
> >> > > > > to be motivated by balancing load across the cluster, or
> >> > > adding/removing
> >> > > > > brokers. But the JBOD use case has different motivations. So I
> >> suspect
> >> > > > it's
> >> > > > > OK that the APIs are separate, but I'd love to hear what others
> >> think.
> >> > > >
> >> > > >
> >> > > > > I think you're right about TopicPartitionReplica. At the
> protocol
> >> level
> >> > > > we
> >> > > > > could group topics and partitions together so avoid having the
> >> same
> >> > > topic
> >> > > > > name multiple times when querying for the status of all the
> >> partitions
> >> > > > of a
> >> > > > > topic.
> >> > > > >
> >> > > > > Thanks again for taking the time to respond.
> >> > > > >
> >> > > > > Tom
> >> > > > >
> >> > > > > On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com>
> wrote:
> >> > > > >
> >> > > > > > Hey Tom,
> >> > > > > >
> >> > > > > > Thanks for the KIP. It seems that the prior discussion in this
> >> thread
> >> > > > has
> >> > > > > > focused on reassigning partitions (or AlterTopics). I haven't
> >> looked
> >> > > > into
> >> > > > > > this yet. I have two comments on the replicaStatus() API and
> the
> >> > > > > > ReplicaStatusRequest:
> >> > > > > >
> >> > > > > > -  It seems that the use-case for ReplicaStatusRequest is
> >> covered by
> >> > > > > > the DescribeDirsRequest introduced in KIP-113
> >> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > > > > 113%3A+Support+replicas+movement+between+log+directories>.
> >> > > > > > DescribeDirsResponse contains the logEndOffset for the
> specified
> >> > > > > > partitions. The the lag of the follower replica can be derived
> >> as the
> >> > > > > > difference between leader's LEO and follower's LEO. Do you
> >> think we
> >> > > can
> >> > > > > > simply use DescribeDirsRequest without introducing
> >> > > > ReplicaStatusRequest?
> >> > > > > >
> >> > > > > > - According to the current KIP, replicaStatus() takes a list
> of
> >> > > > > partitions
> >> > > > > > as input. And its output maps the partition to a list of
> replica
> >> > > > status.
> >> > > > > Do
> >> > > > > > you think it would be better to have a new class called
> >> > > > > > TopicPartitionReplica, which identifies the topic, partition
> >> and the
> >> > > > > > brokerId. replicaStatus can then take a list of
> >> TopicPartitionReplica
> >> > > > as
> >> > > > > > input. And its output maps the replica to replica status. The
> >> latter
> >> > > > API
> >> > > > > > seems simpler and also matches the method name better. What do
> >> you
> >> > > > think?
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Dong
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <
> >> t.j.bentley@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi again Ismael,
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > 1. It's worth emphasising that reassigning partitions is a
> >> > > different
> >> > > > > > > >> process than what happens when a topic is created, so not
> >> sure
> >> > > > > trying
> >> > > > > > to
> >> > > > > > > >> make it symmetric is beneficial. In addition to what was
> >> already
> >> > > > > > > >> discussed,
> >> > > > > > > >> one should also enable replication throttling before
> >> moving the
> >> > > > > data.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Thanks for your responses.The only advantage I can see
> from
> >> > > having
> >> > > > > > > > separate APIs for partition count change vs. the other
> >> changes is
> >> > > > > that
> >> > > > > > we
> >> > > > > > > > expect the former to return synchronously, and the latter
> to
> >> > > > > (usually)
> >> > > > > > > > return asynchronously. Lumping them into the same API
> forces
> >> > > > clients
> >> > > > > of
> >> > > > > > > > that API to code for the two possible cases. Is this the
> >> > > particular
> >> > > > > > > concern
> >> > > > > > > > you had in mind, or do you have others?
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > Just to help see what this looks like I've created another
> >> version
> >> > > of
> >> > > > > > > KIP-179 (
> >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> >> > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+
> >> AdminClient
> >> > > > > > > ).
> >> > > > > > >
> >> > > > > > > The AdminClient APIs are more type safe (harder to misuse).
> >> The
> >> > > > > > > asynchronous nature of the alterReplicationFactors() and
> >> > > > > > > reassignPartitions() methods aren't really apparent (you'd
> >> still
> >> > > have
> >> > > > > to
> >> > > > > > > read the javadocs to know you had to call replicaStatus() to
> >> > > > determine
> >> > > > > > > completion), but it's still better that the lumped-together
> >> API
> >> > > > because
> >> > > > > > the
> >> > > > > > > methods are _consistently_ async. The Protocol APIs are
> >> slightly
> >> > > > nicer
> >> > > > > > too,
> >> > > > > > > in that INVALID_REQUEST is less commonly returned. Another
> >> benefit
> >> > > is
> >> > > > > it
> >> > > > > > > would allow for these different alteration APIs to be
> modified
> >> > > > > > > independently in the future, should that necessary.
> >> > > > > > >
> >> > > > > > > What do others think?
> >> > > > > > >
> >> > > > > > > You're right that the proposal doesn't cover setting and
> >> clearing a
> >> > > > > > > > throttle, and it should. I will study the code about how
> >> this
> >> > > works
> >> > > > > > > before
> >> > > > > > > > posting back about that.
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > AFAICS from the code the throttle is simply a dynamic config
> >> of the
> >> > > > > > broker.
> >> > > > > > > We already have the AdminClient.alterConfigs API for setting
> >> this,
> >> > > so
> >> > > > > the
> >> > > > > > > refactoring of kafka-reassign-partitions.sh could just use
> >> that
> >> > > > > existing
> >> > > > > > > API. We could still achieve your objective of setting the
> >> throttle
> >> > > > > before
> >> > > > > > > reassignment by making the --throttle option mandatory when
> >> > > --execute
> >> > > > > was
> >> > > > > > > present. Or did you have something else in mind?
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > >
> >> > > > > > > Tom
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi all,

I've been thinking about the proposed changes in KIP-179 and, on reflection,
I don't think the API presented is really ideal. Some of the limitations it
has include:

1. It sticks to the current, batch oriented (i.e. a single set of
reassignments at a time), model.
2. It still doesn't really provide a nice way of knowing that a
reassignment is complete.
3. As presented, the automatic removal of throttles only happens at the end
of
the reassignment batch. But individual brokers could be unthrottled before
then.

As an illustration of this, https://issues.apache.org/jira/browse/KAFKA-6304
provides a use case for wanting to cancel a reassignment because one of the
brokers in the new assignment has failed. With the proposed API:

1. We can't identify the subset of the reassignment batch which we want to
cancel.
2. All we could do would be to revise the proposed API to allow calling
   reassignPartitions() while a reassignment was in progress. This second
call
   could revert the subset of reassignments involving the failed broker.
3. But the API has no way to express that the original reassignment was
cancelled.

Another illustration of the problem: An advanced cluster balancer
(such as LinkedIn's cruise control) has to batch large reassignments
(partly so as to make cancellation easier). This batching itself leads in
inefficiency because some of the partitions in the batch will finish before
others, so time is wasted with the cluster only moving a small number of
partitions (when most in the batch have finished).

In hindsight, I think I was too influenced by reproducing what the
kafka-reassign-partitions.sh tool does today. I think what's actually
needed
(for things like cruise control) is an API that's more fine-grained, and
less batch oriented. I am therefore withdrawing KIP-179 and
intend to start a new KIP to propose a different API for partition
reassignment.

I'm still interested in hearing about other deficiencies of the KIP-179
proposal,
so I can avoid them in the new proposal. Similarly, if there are features
you'd like to see in the API, please let me know.

I won't go in to details of the new API here, but the basic idea I'd like
to use is to
give the reassignment of each partition an identity (though this wouldn't
be exposed directly in the API). This is necessary to allow new
reassignments
to be added while some are already running. API methods would then be
provided to discover
all the currently running reassignments, determine if a reassignment is
still running etc.

Cheers,

Tom

On 1 November 2017 at 10:20, Tom Bentley <t....@gmail.com> wrote:

> This thread has been very quiet for a while now. It's unclear whether this
> is because no one has anything more to say, or whether no one has taken a
> look at it in its current form. I suspect the latter, so I'm not calling
> the vote today, but instead asking for more review.
>
> What's currently proposed – in addition to the reassignPartitions() API
> itself – is to have a pair of RPCs for managing throttles. This is quite
> different from the earlier proposal to reuse alterConfigs(). The benefits
> of this specific API include:
>
> * being more typesafe,
> * allowing for the automatic removal of throttles when reassignment has
> completed,
> * being careful about correct management of the throttles wrt controller
> failover
>
> Surely someone has something to say about this, before we reach the vote
> stage?
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+
> Change+ReassignPartitionsCommand+to+use+AdminClient
>
> Thanks,
>
> Tom
>
>
> On 25 October 2017 at 10:33, Tom Bentley <t....@gmail.com> wrote:
>
>> If there are no further comments, I will start a vote on this next week.
>>
>> Thanks,
>>
>> Tom
>>
>> On 20 October 2017 at 08:33, Tom Bentley <t....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I've made a fairly major update to KIP-179 to propose APIs for setting
>>> throttled rates and throttled replicas with the ability to remove these
>>> automatically at the end of reassignment.
>>>
>>> I'd be grateful for your feedback:
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+
>>> Change+ReassignPartitionsCommand+to+use+AdminClient
>>>
>>> Thanks,
>>>
>>> Tom
>>>
>>> On 2 October 2017 at 13:15, Tom Bentley <t....@gmail.com> wrote:
>>>
>>>> One question I have is about whether/how to scope throttling to a
>>>> reassignment. Currently throttles are only loosely associated with
>>>> reassignment: You can start a reassignment without any throttling, add
>>>> throttling to an in-flight reassignment, and remember/forget to remove
>>>> throttling after the reassignment is complete. There's is great flexibility
>>>> in that, but also the risk that you forget the remove the throttle(s).
>>>>
>>>> Just adding an API for setting the throttled rate makes this situation
>>>> worse: While it's nice to be able to auto-remove the throttles rate what
>>>> about the config for the throttled replicas? Also you might add a throttle
>>>> thinking a reassignment is in-flight, but it has in fact just finished:
>>>> Those throttles will now hang around until reset or the end of the next
>>>> reassignment. For these reasons it would be good if the throttle were more
>>>> directly scoped to the reassignment.
>>>>
>>>> On the other hand, taking LinkedIn's Cruise Control as an example,
>>>> there they seem to modify the reassignment znode directly and incrementally
>>>> and so there is no notion of "the reassignment". Reassignments will be
>>>> running continuously, with partitions added before all of the current
>>>> partitions have completed. If there is no meaningful cluster-wide
>>>> "reassignment" then it would be better to remove remove the throttle by
>>>> changing the list of replicas as each replica catches up.
>>>>
>>>> I'm interested in any use cases people can share on this, as I'd like
>>>> the throttle API to be useful for a broad range of use cases, rather than
>>>> being too narrowly focussed on what's needed by the existing CLI tools.
>>>>
>>>> Thanks,
>>>>
>>>> Tom
>>>>
>>>>
>>>>
>>>>
>>>> On 28 September 2017 at 17:22, Tom Bentley <t....@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm starting to think about KIP-179 again. In order to have more
>>>>> manageably-scoped KIPs and PRs I think it might be worth factoring-out the
>>>>> throttling part into a separate KIP. Wdyt?
>>>>>
>>>>> Keeping the throttling discussion in this thread for the moment...
>>>>>
>>>>> The throttling behaviour is currently spread across the
>>>>> `(leader|follower).replication.throttled.replicas` topic config and
>>>>> the `(leader|follower).replication.throttled.rate` dynamic broker
>>>>> config. It's not really clear to me exactly what "removing the throttle" is
>>>>> supposed to mean. I mean we could reset the rate to Long.MAV_VALUE or we
>>>>> could change the list of replicas to an empty list. The
>>>>> ReassignPartitionsCommand does both, but there is some small utility in
>>>>> leaving the rate, but clearing the list, if you've discovered the "right"
>>>>> rate for your cluster/workload and to want it to be sticky for next time.
>>>>> Does any one do this in practice?
>>>>>
>>>>> With regards to throttling, it would be
>>>>>>> worth thinking about a way where the throttling configs can be
>>>>>>> automatically removed without the user having to re-run the tool.
>>>>>>>
>>>>>>
>>>>>> Isn't that just a matter of updating the topic configs for
>>>>>> (leader|follower).replication.throttled.replicas at the same time we
>>>>>> remove the reassignment znode? That leaves open the question about whether
>>>>>> to reset the rates at the same time.
>>>>>>
>>>>>
>>>>> Thinking some more about my "update the configs at the same time we
>>>>> remove the reassignment znode" suggestion. The reassignment znode is
>>>>> persistent, so the reassignment will survive a zookeeper restart. If there
>>>>> was a flag for the auto-removal of the throttle it would likewise need to
>>>>> be persistent. Otherwise a ZK restart would remember the reassignment, but
>>>>> forget about the preference for auto removal of throttles. So, we would use
>>>>> a persistent znode (a child of the reassignment path, perhaps) to store a
>>>>> flag for throttle removal.
>>>>>
>>>>> Thoughts?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Tom
>>>>>
>>>>
>>>>
>>>
>>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
This thread has been very quiet for a while now. It's unclear whether this
is because no one has anything more to say, or whether no one has taken a
look at it in its current form. I suspect the latter, so I'm not calling
the vote today, but instead asking for more review.

What's currently proposed – in addition to the reassignPartitions() API
itself – is to have a pair of RPCs for managing throttles. This is quite
different from the earlier proposal to reuse alterConfigs(). The benefits
of this specific API include:

* being more typesafe,
* allowing for the automatic removal of throttles when reassignment has
completed,
* being careful about correct management of the throttles wrt controller
failover

Surely someone has something to say about this, before we reach the vote
stage?

https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+Change+
ReassignPartitionsCommand+to+use+AdminClient

Thanks,

Tom


On 25 October 2017 at 10:33, Tom Bentley <t....@gmail.com> wrote:

> If there are no further comments, I will start a vote on this next week.
>
> Thanks,
>
> Tom
>
> On 20 October 2017 at 08:33, Tom Bentley <t....@gmail.com> wrote:
>
>> Hi,
>>
>> I've made a fairly major update to KIP-179 to propose APIs for setting
>> throttled rates and throttled replicas with the ability to remove these
>> automatically at the end of reassignment.
>>
>> I'd be grateful for your feedback:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+
>> Change+ReassignPartitionsCommand+to+use+AdminClient
>>
>> Thanks,
>>
>> Tom
>>
>> On 2 October 2017 at 13:15, Tom Bentley <t....@gmail.com> wrote:
>>
>>> One question I have is about whether/how to scope throttling to a
>>> reassignment. Currently throttles are only loosely associated with
>>> reassignment: You can start a reassignment without any throttling, add
>>> throttling to an in-flight reassignment, and remember/forget to remove
>>> throttling after the reassignment is complete. There's is great flexibility
>>> in that, but also the risk that you forget the remove the throttle(s).
>>>
>>> Just adding an API for setting the throttled rate makes this situation
>>> worse: While it's nice to be able to auto-remove the throttles rate what
>>> about the config for the throttled replicas? Also you might add a throttle
>>> thinking a reassignment is in-flight, but it has in fact just finished:
>>> Those throttles will now hang around until reset or the end of the next
>>> reassignment. For these reasons it would be good if the throttle were more
>>> directly scoped to the reassignment.
>>>
>>> On the other hand, taking LinkedIn's Cruise Control as an example, there
>>> they seem to modify the reassignment znode directly and incrementally and
>>> so there is no notion of "the reassignment". Reassignments will be running
>>> continuously, with partitions added before all of the current partitions
>>> have completed. If there is no meaningful cluster-wide "reassignment" then
>>> it would be better to remove remove the throttle by changing the list of
>>> replicas as each replica catches up.
>>>
>>> I'm interested in any use cases people can share on this, as I'd like
>>> the throttle API to be useful for a broad range of use cases, rather than
>>> being too narrowly focussed on what's needed by the existing CLI tools.
>>>
>>> Thanks,
>>>
>>> Tom
>>>
>>>
>>>
>>>
>>> On 28 September 2017 at 17:22, Tom Bentley <t....@gmail.com>
>>> wrote:
>>>
>>>> I'm starting to think about KIP-179 again. In order to have more
>>>> manageably-scoped KIPs and PRs I think it might be worth factoring-out the
>>>> throttling part into a separate KIP. Wdyt?
>>>>
>>>> Keeping the throttling discussion in this thread for the moment...
>>>>
>>>> The throttling behaviour is currently spread across the
>>>> `(leader|follower).replication.throttled.replicas` topic config and
>>>> the `(leader|follower).replication.throttled.rate` dynamic broker
>>>> config. It's not really clear to me exactly what "removing the throttle" is
>>>> supposed to mean. I mean we could reset the rate to Long.MAV_VALUE or we
>>>> could change the list of replicas to an empty list. The
>>>> ReassignPartitionsCommand does both, but there is some small utility in
>>>> leaving the rate, but clearing the list, if you've discovered the "right"
>>>> rate for your cluster/workload and to want it to be sticky for next time.
>>>> Does any one do this in practice?
>>>>
>>>> With regards to throttling, it would be
>>>>>> worth thinking about a way where the throttling configs can be
>>>>>> automatically removed without the user having to re-run the tool.
>>>>>>
>>>>>
>>>>> Isn't that just a matter of updating the topic configs for
>>>>> (leader|follower).replication.throttled.replicas at the same time we
>>>>> remove the reassignment znode? That leaves open the question about whether
>>>>> to reset the rates at the same time.
>>>>>
>>>>
>>>> Thinking some more about my "update the configs at the same time we
>>>> remove the reassignment znode" suggestion. The reassignment znode is
>>>> persistent, so the reassignment will survive a zookeeper restart. If there
>>>> was a flag for the auto-removal of the throttle it would likewise need to
>>>> be persistent. Otherwise a ZK restart would remember the reassignment, but
>>>> forget about the preference for auto removal of throttles. So, we would use
>>>> a persistent znode (a child of the reassignment path, perhaps) to store a
>>>> flag for throttle removal.
>>>>
>>>> Thoughts?
>>>>
>>>> Cheers,
>>>>
>>>> Tom
>>>>
>>>
>>>
>>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
If there are no further comments, I will start a vote on this next week.

Thanks,

Tom

On 20 October 2017 at 08:33, Tom Bentley <t....@gmail.com> wrote:

> Hi,
>
> I've made a fairly major update to KIP-179 to propose APIs for setting
> throttled rates and throttled replicas with the ability to remove these
> automatically at the end of reassignment.
>
> I'd be grateful for your feedback:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+Change+
> ReassignPartitionsCommand+to+use+AdminClient
>
> Thanks,
>
> Tom
>
> On 2 October 2017 at 13:15, Tom Bentley <t....@gmail.com> wrote:
>
>> One question I have is about whether/how to scope throttling to a
>> reassignment. Currently throttles are only loosely associated with
>> reassignment: You can start a reassignment without any throttling, add
>> throttling to an in-flight reassignment, and remember/forget to remove
>> throttling after the reassignment is complete. There's is great flexibility
>> in that, but also the risk that you forget the remove the throttle(s).
>>
>> Just adding an API for setting the throttled rate makes this situation
>> worse: While it's nice to be able to auto-remove the throttles rate what
>> about the config for the throttled replicas? Also you might add a throttle
>> thinking a reassignment is in-flight, but it has in fact just finished:
>> Those throttles will now hang around until reset or the end of the next
>> reassignment. For these reasons it would be good if the throttle were more
>> directly scoped to the reassignment.
>>
>> On the other hand, taking LinkedIn's Cruise Control as an example, there
>> they seem to modify the reassignment znode directly and incrementally and
>> so there is no notion of "the reassignment". Reassignments will be running
>> continuously, with partitions added before all of the current partitions
>> have completed. If there is no meaningful cluster-wide "reassignment" then
>> it would be better to remove remove the throttle by changing the list of
>> replicas as each replica catches up.
>>
>> I'm interested in any use cases people can share on this, as I'd like the
>> throttle API to be useful for a broad range of use cases, rather than being
>> too narrowly focussed on what's needed by the existing CLI tools.
>>
>> Thanks,
>>
>> Tom
>>
>>
>>
>>
>> On 28 September 2017 at 17:22, Tom Bentley <t....@gmail.com> wrote:
>>
>>> I'm starting to think about KIP-179 again. In order to have more
>>> manageably-scoped KIPs and PRs I think it might be worth factoring-out the
>>> throttling part into a separate KIP. Wdyt?
>>>
>>> Keeping the throttling discussion in this thread for the moment...
>>>
>>> The throttling behaviour is currently spread across the
>>> `(leader|follower).replication.throttled.replicas` topic config and the
>>> `(leader|follower).replication.throttled.rate` dynamic broker config.
>>> It's not really clear to me exactly what "removing the throttle" is
>>> supposed to mean. I mean we could reset the rate to Long.MAV_VALUE or we
>>> could change the list of replicas to an empty list. The
>>> ReassignPartitionsCommand does both, but there is some small utility in
>>> leaving the rate, but clearing the list, if you've discovered the "right"
>>> rate for your cluster/workload and to want it to be sticky for next time.
>>> Does any one do this in practice?
>>>
>>> With regards to throttling, it would be
>>>>> worth thinking about a way where the throttling configs can be
>>>>> automatically removed without the user having to re-run the tool.
>>>>>
>>>>
>>>> Isn't that just a matter of updating the topic configs for
>>>> (leader|follower).replication.throttled.replicas at the same time we
>>>> remove the reassignment znode? That leaves open the question about whether
>>>> to reset the rates at the same time.
>>>>
>>>
>>> Thinking some more about my "update the configs at the same time we
>>> remove the reassignment znode" suggestion. The reassignment znode is
>>> persistent, so the reassignment will survive a zookeeper restart. If there
>>> was a flag for the auto-removal of the throttle it would likewise need to
>>> be persistent. Otherwise a ZK restart would remember the reassignment, but
>>> forget about the preference for auto removal of throttles. So, we would use
>>> a persistent znode (a child of the reassignment path, perhaps) to store a
>>> flag for throttle removal.
>>>
>>> Thoughts?
>>>
>>> Cheers,
>>>
>>> Tom
>>>
>>
>>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi,

I've made a fairly major update to KIP-179 to propose APIs for setting
throttled rates and throttled replicas with the ability to remove these
automatically at the end of reassignment.

I'd be grateful for your feedback:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient

Thanks,

Tom

On 2 October 2017 at 13:15, Tom Bentley <t....@gmail.com> wrote:

> One question I have is about whether/how to scope throttling to a
> reassignment. Currently throttles are only loosely associated with
> reassignment: You can start a reassignment without any throttling, add
> throttling to an in-flight reassignment, and remember/forget to remove
> throttling after the reassignment is complete. There's is great flexibility
> in that, but also the risk that you forget the remove the throttle(s).
>
> Just adding an API for setting the throttled rate makes this situation
> worse: While it's nice to be able to auto-remove the throttles rate what
> about the config for the throttled replicas? Also you might add a throttle
> thinking a reassignment is in-flight, but it has in fact just finished:
> Those throttles will now hang around until reset or the end of the next
> reassignment. For these reasons it would be good if the throttle were more
> directly scoped to the reassignment.
>
> On the other hand, taking LinkedIn's Cruise Control as an example, there
> they seem to modify the reassignment znode directly and incrementally and
> so there is no notion of "the reassignment". Reassignments will be running
> continuously, with partitions added before all of the current partitions
> have completed. If there is no meaningful cluster-wide "reassignment" then
> it would be better to remove remove the throttle by changing the list of
> replicas as each replica catches up.
>
> I'm interested in any use cases people can share on this, as I'd like the
> throttle API to be useful for a broad range of use cases, rather than being
> too narrowly focussed on what's needed by the existing CLI tools.
>
> Thanks,
>
> Tom
>
>
>
>
> On 28 September 2017 at 17:22, Tom Bentley <t....@gmail.com> wrote:
>
>> I'm starting to think about KIP-179 again. In order to have more
>> manageably-scoped KIPs and PRs I think it might be worth factoring-out the
>> throttling part into a separate KIP. Wdyt?
>>
>> Keeping the throttling discussion in this thread for the moment...
>>
>> The throttling behaviour is currently spread across the
>> `(leader|follower).replication.throttled.replicas` topic config and the
>> `(leader|follower).replication.throttled.rate` dynamic broker config.
>> It's not really clear to me exactly what "removing the throttle" is
>> supposed to mean. I mean we could reset the rate to Long.MAV_VALUE or we
>> could change the list of replicas to an empty list. The
>> ReassignPartitionsCommand does both, but there is some small utility in
>> leaving the rate, but clearing the list, if you've discovered the "right"
>> rate for your cluster/workload and to want it to be sticky for next time.
>> Does any one do this in practice?
>>
>> With regards to throttling, it would be
>>>> worth thinking about a way where the throttling configs can be
>>>> automatically removed without the user having to re-run the tool.
>>>>
>>>
>>> Isn't that just a matter of updating the topic configs for
>>> (leader|follower).replication.throttled.replicas at the same time we
>>> remove the reassignment znode? That leaves open the question about whether
>>> to reset the rates at the same time.
>>>
>>
>> Thinking some more about my "update the configs at the same time we
>> remove the reassignment znode" suggestion. The reassignment znode is
>> persistent, so the reassignment will survive a zookeeper restart. If there
>> was a flag for the auto-removal of the throttle it would likewise need to
>> be persistent. Otherwise a ZK restart would remember the reassignment, but
>> forget about the preference for auto removal of throttles. So, we would use
>> a persistent znode (a child of the reassignment path, perhaps) to store a
>> flag for throttle removal.
>>
>> Thoughts?
>>
>> Cheers,
>>
>> Tom
>>
>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
One question I have is about whether/how to scope throttling to a
reassignment. Currently throttles are only loosely associated with
reassignment: You can start a reassignment without any throttling, add
throttling to an in-flight reassignment, and remember/forget to remove
throttling after the reassignment is complete. There's is great flexibility
in that, but also the risk that you forget the remove the throttle(s).

Just adding an API for setting the throttled rate makes this situation
worse: While it's nice to be able to auto-remove the throttles rate what
about the config for the throttled replicas? Also you might add a throttle
thinking a reassignment is in-flight, but it has in fact just finished:
Those throttles will now hang around until reset or the end of the next
reassignment. For these reasons it would be good if the throttle were more
directly scoped to the reassignment.

On the other hand, taking LinkedIn's Cruise Control as an example, there
they seem to modify the reassignment znode directly and incrementally and
so there is no notion of "the reassignment". Reassignments will be running
continuously, with partitions added before all of the current partitions
have completed. If there is no meaningful cluster-wide "reassignment" then
it would be better to remove remove the throttle by changing the list of
replicas as each replica catches up.

I'm interested in any use cases people can share on this, as I'd like the
throttle API to be useful for a broad range of use cases, rather than being
too narrowly focussed on what's needed by the existing CLI tools.

Thanks,

Tom




On 28 September 2017 at 17:22, Tom Bentley <t....@gmail.com> wrote:

> I'm starting to think about KIP-179 again. In order to have more
> manageably-scoped KIPs and PRs I think it might be worth factoring-out the
> throttling part into a separate KIP. Wdyt?
>
> Keeping the throttling discussion in this thread for the moment...
>
> The throttling behaviour is currently spread across the
> `(leader|follower).replication.throttled.replicas` topic config and the
> `(leader|follower).replication.throttled.rate` dynamic broker config.
> It's not really clear to me exactly what "removing the throttle" is
> supposed to mean. I mean we could reset the rate to Long.MAV_VALUE or we
> could change the list of replicas to an empty list. The
> ReassignPartitionsCommand does both, but there is some small utility in
> leaving the rate, but clearing the list, if you've discovered the "right"
> rate for your cluster/workload and to want it to be sticky for next time.
> Does any one do this in practice?
>
> With regards to throttling, it would be
>>> worth thinking about a way where the throttling configs can be
>>> automatically removed without the user having to re-run the tool.
>>>
>>
>> Isn't that just a matter of updating the topic configs for
>> (leader|follower).replication.throttled.replicas at the same time we
>> remove the reassignment znode? That leaves open the question about whether
>> to reset the rates at the same time.
>>
>
> Thinking some more about my "update the configs at the same time we remove
> the reassignment znode" suggestion. The reassignment znode is persistent,
> so the reassignment will survive a zookeeper restart. If there was a flag
> for the auto-removal of the throttle it would likewise need to be
> persistent. Otherwise a ZK restart would remember the reassignment, but
> forget about the preference for auto removal of throttles. So, we would use
> a persistent znode (a child of the reassignment path, perhaps) to store a
> flag for throttle removal.
>
> Thoughts?
>
> Cheers,
>
> Tom
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
I'm starting to think about KIP-179 again. In order to have more
manageably-scoped KIPs and PRs I think it might be worth factoring-out the
throttling part into a separate KIP. Wdyt?

Keeping the throttling discussion in this thread for the moment...

The throttling behaviour is currently spread across the `(leader|follower).
replication.throttled.replicas` topic config and the
`(leader|follower).replication.throttled.rate`
dynamic broker config. It's not really clear to me exactly what "removing
the throttle" is supposed to mean. I mean we could reset the rate to
Long.MAV_VALUE or we could change the list of replicas to an empty list.
The ReassignPartitionsCommand does both, but there is some small utility in
leaving the rate, but clearing the list, if you've discovered the "right"
rate for your cluster/workload and to want it to be sticky for next time.
Does any one do this in practice?

With regards to throttling, it would be
>> worth thinking about a way where the throttling configs can be
>> automatically removed without the user having to re-run the tool.
>>
>
> Isn't that just a matter of updating the topic configs for
> (leader|follower).replication.throttled.replicas at the same time we
> remove the reassignment znode? That leaves open the question about whether
> to reset the rates at the same time.
>

Thinking some more about my "update the configs at the same time we remove
the reassignment znode" suggestion. The reassignment znode is persistent,
so the reassignment will survive a zookeeper restart. If there was a flag
for the auto-removal of the throttle it would likewise need to be
persistent. Otherwise a ZK restart would remember the reassignment, but
forget about the preference for auto removal of throttles. So, we would use
a persistent znode (a child of the reassignment path, perhaps) to store a
flag for throttle removal.

Thoughts?

Cheers,

Tom

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi Ismael,

OK, KIP-195 has been factored out.

Regarding the dynamic configs, I personally still think we should have a
> specific protocol API for that


Can you explain a little more why?

With regards to throttling, it would be
> worth thinking about a way where the throttling configs can be
> automatically removed without the user having to re-run the tool.
>

Isn't that just a matter of updating the topic configs for
(leader|follower).replication.throttled.replicas
at the same time we remove the reassignment znode? That leaves open the
question about whether to reset the rates at the same time.

But now I wonder what the broker configs
"(leader|follower).replication.throttled.rate"
are DynamicConfigs but the topic configs
"(leader|follower).replication.throttled.replicas"
are normal configs. Aren't the topic ones for the replicas just as dynamic
as the broker ones for the rate?

Thanks,

Tom


On 7 September 2017 at 17:24, Ismael Juma <is...@juma.me.uk> wrote:

> Hi Tom,
>
> It won't be used within Kafka, but it's a public API that can be used by
> other projects. And the protocol can be used by non-Java clients. So, there
> is still value in including it.
>
> Regarding the dynamic configs, I personally still think we should have a
> specific protocol API for that. With regards to throttling, it would be
> worth thinking about a way where the throttling configs can be
> automatically removed without the user having to re-run the tool. So, yes,
> maybe it should be a separate KIP as well.
>
> Not sure if we need it in the template, but you're welcome to mention any
> dependencies when there are some.
>
> Thanks,
> Ismael
>
> On Thu, Sep 7, 2017 at 5:15 PM, Tom Bentley <t....@gmail.com> wrote:
>
> > Hi Ismael,
> >
> > It would be good to get at least some of this into 1.0.0.
> >
> > We could put the increasePartitions() work into another KIP, but it would
> > be an unused AdminClient API in that release. The consumer of this API
> will
> > be the TopicsCommand when that get refactored to use the AdminClient.
> > That's something Paolo Patierno is proposing to do but afaik not in time
> > for 1.0.0. I don't think that's an issue, though, so I'll split out a
> > separate KIP.
> >
> > FWIW, we could also split out the proposal to support describeConfigs()
> and
> > alterConfigs() for dynamic configs into a separate KIP too. But that's
> also
> > a decision we can defer until we're looking at the remainder of KIP-179.
> >
> > Aside: I wonder if it would be a good idea for the KIP template to have a
> > "Depends on" field so people can more easily keep track of how multiple
> > in-flight KIPs depend on one another?
> >
> > Cheers,
> >
> > Tom
> >
> > On 7 September 2017 at 16:42, Ismael Juma <is...@juma.me.uk> wrote:
> >
> > > Hi Tom,
> > >
> > > What do you think of moving `increasePartitionsCount` (or
> > > `increaseNumPartitions`) to a separate KIP? That is simple enough that
> we
> > > could potentially include it in 1.0.0. I'd be happy to review it.
> > > ReassignPartitions is more complex and we can probably aim to include
> > that
> > > in the January release. What do you think?
> > >
> > > Ismael
> > >
> > > On Wed, Sep 6, 2017 at 11:42 PM, Colin McCabe <cm...@apache.org>
> > wrote:
> > >
> > > > On Wed, Sep 6, 2017, at 00:20, Tom Bentley wrote:
> > > > > Hi Ted and Colin,
> > > > >
> > > > > Thanks for the comments.
> > > > >
> > > > > It seems you're both happier with reassign rather than assign, so
> I'm
> > > > > happy
> > > > > to stick with that.
> > > > >
> > > > >
> > > > > On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org>
> > wrote:
> > > > >
> > > > > > ...
> > > > >
> > > > >
> > > > > > Do we expect that reducing the number of partitions will ever be
> > > > > > supported by this API?  It seems like decreasing would require a
> > > > > > different API-- one which supported data movement, had a "check
> > > status
> > > > > > of this operation" feature, etc. etc.  If this API is only ever
> > going
> > > > to
> > > > > > be used to increase the number of partitions, I think we should
> > just
> > > > > > call it "increasePartitionCount" to avoid confusion.
> > > > > >
> > > > >
> > > > > I thought a little about the decrease possibility (hence the static
> > > > > factory
> > > > > methods on PartitionCount), but not really in enough detail. I
> > suppose
> > > a
> > > > > decrease process could look like this:
> > > > >
> > > > > 1. Producers start partitioning based on the decreased partition
> > count.
> > > > > 2. Consumers continue to consume from all partitions.
> > > > > 3. At some point all the records in the old partitions have expired
> > and
> > > > > they can be deleted.
> > > > >
> > > > > This wouldn't work for compacted topics. Of course a more
> aggressive
> > > > > strategy is also possible where we forcibly move data from the
> > > partitions
> > > > > to be deleted.
> > > > >
> > > > > Anyway, in either case the process would be long running, whereas
> the
> > > > > increase case is fast, so the semantics are quite different. So I
> > > agree,
> > > > > lets rename the method to make clear that it's only for increasing
> > the
> > > > > partition count.
> > > > >
> > > > > >
> > > > > > > Outstanding questions:
> > > > > > >
> > > > > > > 1. Is the proposed alterInterReplicaThrottle() API really
> better
> > > than
> > > > > > > changing the throttle via alterConfigs()?
> > > > > >
> > > > > > That's a good point.  I would argue that we should just use
> > > > alterConfigs
> > > > > > to set the broker configuration, rather than having a special RPC
> > > just
> > > > > > for this.
> > > > > >
> > > > >
> > > > > Yes, I'm minded to agree.
> > > > >
> > > > > The reason I originally thought a special API might be better was
> if
> > > > > people
> > > > > felt that the DynamicConfig mechanism (which seems to exist only to
> > > > > support
> > > > > these throttles) was an implementation detail of the throttles.
> But I
> > > now
> > > > > realise that they're visible via kafka-topics.sh, so they're
> > > effectively
> > > > > already a public API.
> > > > >
> > > > >
> > > > > >
> > > > > > ...
> > > > > > > Would it be a problem that
> > > > > > > triggering the reassignment required ClusterAction on the
> > Cluster,
> > > > but
> > > > > > > throttling the assignment required Alter on the Topic? What if
> a
> > > > user had
> > > > > > > the former permission, but not the latter?
> > > > > >
> > > > > > We've been trying to reserve ClusterAction on Cluster for
> > > > > > broker-initiated operations.  Alter on Cluster is the ACL for
> "root
> > > > > > stuff" and I would argue that it should be what we use here.
> > > > > >
> > > > > > For reconfiguring the broker, I think we should follow KIP-133
> and
> > > use
> > > > > > AlterConfigs on the Broker resource.  (Of course, if you just use
> > the
> > > > > > existing alterConfigs call, you get this without any special
> > effort.)
> > > > > >
> > > > >
> > > > > Yes, sorry, what I put in the email about authorisation wasn't
> what I
> > > put
> > > > > in the KIP (I revised the KIP after drafting the email and then
> > forgot
> > > to
> > > > > update the email).
> > > > >
> > > > > Although KIP-133 proposes a Broker resource, I don't see one in the
> > > code
> > > > > and KIP-133 was supposedly delivered in 0.11. Can anyone fill in
> the
> > > > > story
> > > > > here? Is it simply because the functionality to update broker
> configs
> > > > > hasn't been implemented yet?
> > > >
> > > > Look in
> > > > ./clients/src/main/java/org/apache/kafka/common/config/
> > > > ConfigResource.java,
> > > > for the BROKER resource.  I bet you're looking at the Resource class
> > > > used for ACLs, which is a different class.
> > > >
> > > > >
> > > > > As currently proposed, both reassignPartitions() and
> > > > > alterInterBrokerThrottle()
> > > > > require Alter on the Cluster. If we used alterConfigs() to set the
> > > > > throttles then we create a situation where the triggering of the
> > > > > reassignment required Alter(Cluster), but the throttling required
> > > > > Alter(Broker), and the user might have the former but not the
> > latter. I
> > > > > don't think this is likely to be a big deal in practice, but maybe
> > > others
> > > > > disagree?
> > > >
> > > > Alter:Cluster is essentially root, though.  If you have Alter:Cluster
> > > > and you don't have AlterConfigs:Broker, you can just create a new ACL
> > > > giving it to yourself (creating and deleting ACLs is one of the
> powers
> > > > of Alter:Cluster)
> > > >
> > > > cheers,
> > > > Colin
> > > >
> > > > >
> > > > >
> > > > > > >
> > > > > > > 2. Is reassignPartitions() really the best name? I find the
> > > > distinction
> > > > > > > between reassigning and just assigning to be of limited value,
> > and
> > > > > > > "reassign" is misleading when the APIs now used for changing
> the
> > > > > > > replication factor too. Since the API is asynchronous/long
> > running,
> > > > it
> > > > > > > might be better to indicate that in the name some how. What
> about
> > > > > > > startPartitionAssignment()?
> > > > > >
> > > > > > Good idea -- I like the idea of using "start" or "initiate" to
> > > indicate
> > > > > > that this is kicking off a long-running operation.
> > > > > >
> > > > > > "reassign" seemed like a natural choice to me since this is
> > changing
> > > an
> > > > > > existing assignment.  It was assigned (when the topic it was
> > > created)--
> > > > > > now it's being re-assigned.  If you change it to just "assign"
> then
> > > it
> > > > > > feels confusing to me.  A new user might ask if "assigning
> > > partitions"
> > > > > > is a step that they should take after creating a topic, similar
> to
> > > how
> > > > > > subscribing to topics is a step you take after creating a
> consumer.
> > > > > >
> > > > >
> > > > > Yeah, I find this new user argument persuasive, so I'm happy to
> stick
> > > > > with
> > > > > reassign.
> > > > >
> > > > > Thanks again for the feedback,
> > > > >
> > > > > Tom
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Ismael Juma <is...@juma.me.uk>.
Hi Tom,

It won't be used within Kafka, but it's a public API that can be used by
other projects. And the protocol can be used by non-Java clients. So, there
is still value in including it.

Regarding the dynamic configs, I personally still think we should have a
specific protocol API for that. With regards to throttling, it would be
worth thinking about a way where the throttling configs can be
automatically removed without the user having to re-run the tool. So, yes,
maybe it should be a separate KIP as well.

Not sure if we need it in the template, but you're welcome to mention any
dependencies when there are some.

Thanks,
Ismael

On Thu, Sep 7, 2017 at 5:15 PM, Tom Bentley <t....@gmail.com> wrote:

> Hi Ismael,
>
> It would be good to get at least some of this into 1.0.0.
>
> We could put the increasePartitions() work into another KIP, but it would
> be an unused AdminClient API in that release. The consumer of this API will
> be the TopicsCommand when that get refactored to use the AdminClient.
> That's something Paolo Patierno is proposing to do but afaik not in time
> for 1.0.0. I don't think that's an issue, though, so I'll split out a
> separate KIP.
>
> FWIW, we could also split out the proposal to support describeConfigs() and
> alterConfigs() for dynamic configs into a separate KIP too. But that's also
> a decision we can defer until we're looking at the remainder of KIP-179.
>
> Aside: I wonder if it would be a good idea for the KIP template to have a
> "Depends on" field so people can more easily keep track of how multiple
> in-flight KIPs depend on one another?
>
> Cheers,
>
> Tom
>
> On 7 September 2017 at 16:42, Ismael Juma <is...@juma.me.uk> wrote:
>
> > Hi Tom,
> >
> > What do you think of moving `increasePartitionsCount` (or
> > `increaseNumPartitions`) to a separate KIP? That is simple enough that we
> > could potentially include it in 1.0.0. I'd be happy to review it.
> > ReassignPartitions is more complex and we can probably aim to include
> that
> > in the January release. What do you think?
> >
> > Ismael
> >
> > On Wed, Sep 6, 2017 at 11:42 PM, Colin McCabe <cm...@apache.org>
> wrote:
> >
> > > On Wed, Sep 6, 2017, at 00:20, Tom Bentley wrote:
> > > > Hi Ted and Colin,
> > > >
> > > > Thanks for the comments.
> > > >
> > > > It seems you're both happier with reassign rather than assign, so I'm
> > > > happy
> > > > to stick with that.
> > > >
> > > >
> > > > On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org>
> wrote:
> > > >
> > > > > ...
> > > >
> > > >
> > > > > Do we expect that reducing the number of partitions will ever be
> > > > > supported by this API?  It seems like decreasing would require a
> > > > > different API-- one which supported data movement, had a "check
> > status
> > > > > of this operation" feature, etc. etc.  If this API is only ever
> going
> > > to
> > > > > be used to increase the number of partitions, I think we should
> just
> > > > > call it "increasePartitionCount" to avoid confusion.
> > > > >
> > > >
> > > > I thought a little about the decrease possibility (hence the static
> > > > factory
> > > > methods on PartitionCount), but not really in enough detail. I
> suppose
> > a
> > > > decrease process could look like this:
> > > >
> > > > 1. Producers start partitioning based on the decreased partition
> count.
> > > > 2. Consumers continue to consume from all partitions.
> > > > 3. At some point all the records in the old partitions have expired
> and
> > > > they can be deleted.
> > > >
> > > > This wouldn't work for compacted topics. Of course a more aggressive
> > > > strategy is also possible where we forcibly move data from the
> > partitions
> > > > to be deleted.
> > > >
> > > > Anyway, in either case the process would be long running, whereas the
> > > > increase case is fast, so the semantics are quite different. So I
> > agree,
> > > > lets rename the method to make clear that it's only for increasing
> the
> > > > partition count.
> > > >
> > > > >
> > > > > > Outstanding questions:
> > > > > >
> > > > > > 1. Is the proposed alterInterReplicaThrottle() API really better
> > than
> > > > > > changing the throttle via alterConfigs()?
> > > > >
> > > > > That's a good point.  I would argue that we should just use
> > > alterConfigs
> > > > > to set the broker configuration, rather than having a special RPC
> > just
> > > > > for this.
> > > > >
> > > >
> > > > Yes, I'm minded to agree.
> > > >
> > > > The reason I originally thought a special API might be better was if
> > > > people
> > > > felt that the DynamicConfig mechanism (which seems to exist only to
> > > > support
> > > > these throttles) was an implementation detail of the throttles. But I
> > now
> > > > realise that they're visible via kafka-topics.sh, so they're
> > effectively
> > > > already a public API.
> > > >
> > > >
> > > > >
> > > > > ...
> > > > > > Would it be a problem that
> > > > > > triggering the reassignment required ClusterAction on the
> Cluster,
> > > but
> > > > > > throttling the assignment required Alter on the Topic? What if a
> > > user had
> > > > > > the former permission, but not the latter?
> > > > >
> > > > > We've been trying to reserve ClusterAction on Cluster for
> > > > > broker-initiated operations.  Alter on Cluster is the ACL for "root
> > > > > stuff" and I would argue that it should be what we use here.
> > > > >
> > > > > For reconfiguring the broker, I think we should follow KIP-133 and
> > use
> > > > > AlterConfigs on the Broker resource.  (Of course, if you just use
> the
> > > > > existing alterConfigs call, you get this without any special
> effort.)
> > > > >
> > > >
> > > > Yes, sorry, what I put in the email about authorisation wasn't what I
> > put
> > > > in the KIP (I revised the KIP after drafting the email and then
> forgot
> > to
> > > > update the email).
> > > >
> > > > Although KIP-133 proposes a Broker resource, I don't see one in the
> > code
> > > > and KIP-133 was supposedly delivered in 0.11. Can anyone fill in the
> > > > story
> > > > here? Is it simply because the functionality to update broker configs
> > > > hasn't been implemented yet?
> > >
> > > Look in
> > > ./clients/src/main/java/org/apache/kafka/common/config/
> > > ConfigResource.java,
> > > for the BROKER resource.  I bet you're looking at the Resource class
> > > used for ACLs, which is a different class.
> > >
> > > >
> > > > As currently proposed, both reassignPartitions() and
> > > > alterInterBrokerThrottle()
> > > > require Alter on the Cluster. If we used alterConfigs() to set the
> > > > throttles then we create a situation where the triggering of the
> > > > reassignment required Alter(Cluster), but the throttling required
> > > > Alter(Broker), and the user might have the former but not the
> latter. I
> > > > don't think this is likely to be a big deal in practice, but maybe
> > others
> > > > disagree?
> > >
> > > Alter:Cluster is essentially root, though.  If you have Alter:Cluster
> > > and you don't have AlterConfigs:Broker, you can just create a new ACL
> > > giving it to yourself (creating and deleting ACLs is one of the powers
> > > of Alter:Cluster)
> > >
> > > cheers,
> > > Colin
> > >
> > > >
> > > >
> > > > > >
> > > > > > 2. Is reassignPartitions() really the best name? I find the
> > > distinction
> > > > > > between reassigning and just assigning to be of limited value,
> and
> > > > > > "reassign" is misleading when the APIs now used for changing the
> > > > > > replication factor too. Since the API is asynchronous/long
> running,
> > > it
> > > > > > might be better to indicate that in the name some how. What about
> > > > > > startPartitionAssignment()?
> > > > >
> > > > > Good idea -- I like the idea of using "start" or "initiate" to
> > indicate
> > > > > that this is kicking off a long-running operation.
> > > > >
> > > > > "reassign" seemed like a natural choice to me since this is
> changing
> > an
> > > > > existing assignment.  It was assigned (when the topic it was
> > created)--
> > > > > now it's being re-assigned.  If you change it to just "assign" then
> > it
> > > > > feels confusing to me.  A new user might ask if "assigning
> > partitions"
> > > > > is a step that they should take after creating a topic, similar to
> > how
> > > > > subscribing to topics is a step you take after creating a consumer.
> > > > >
> > > >
> > > > Yeah, I find this new user argument persuasive, so I'm happy to stick
> > > > with
> > > > reassign.
> > > >
> > > > Thanks again for the feedback,
> > > >
> > > > Tom
> > >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi Ismael,

It would be good to get at least some of this into 1.0.0.

We could put the increasePartitions() work into another KIP, but it would
be an unused AdminClient API in that release. The consumer of this API will
be the TopicsCommand when that get refactored to use the AdminClient.
That's something Paolo Patierno is proposing to do but afaik not in time
for 1.0.0. I don't think that's an issue, though, so I'll split out a
separate KIP.

FWIW, we could also split out the proposal to support describeConfigs() and
alterConfigs() for dynamic configs into a separate KIP too. But that's also
a decision we can defer until we're looking at the remainder of KIP-179.

Aside: I wonder if it would be a good idea for the KIP template to have a
"Depends on" field so people can more easily keep track of how multiple
in-flight KIPs depend on one another?

Cheers,

Tom

On 7 September 2017 at 16:42, Ismael Juma <is...@juma.me.uk> wrote:

> Hi Tom,
>
> What do you think of moving `increasePartitionsCount` (or
> `increaseNumPartitions`) to a separate KIP? That is simple enough that we
> could potentially include it in 1.0.0. I'd be happy to review it.
> ReassignPartitions is more complex and we can probably aim to include that
> in the January release. What do you think?
>
> Ismael
>
> On Wed, Sep 6, 2017 at 11:42 PM, Colin McCabe <cm...@apache.org> wrote:
>
> > On Wed, Sep 6, 2017, at 00:20, Tom Bentley wrote:
> > > Hi Ted and Colin,
> > >
> > > Thanks for the comments.
> > >
> > > It seems you're both happier with reassign rather than assign, so I'm
> > > happy
> > > to stick with that.
> > >
> > >
> > > On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org> wrote:
> > >
> > > > ...
> > >
> > >
> > > > Do we expect that reducing the number of partitions will ever be
> > > > supported by this API?  It seems like decreasing would require a
> > > > different API-- one which supported data movement, had a "check
> status
> > > > of this operation" feature, etc. etc.  If this API is only ever going
> > to
> > > > be used to increase the number of partitions, I think we should just
> > > > call it "increasePartitionCount" to avoid confusion.
> > > >
> > >
> > > I thought a little about the decrease possibility (hence the static
> > > factory
> > > methods on PartitionCount), but not really in enough detail. I suppose
> a
> > > decrease process could look like this:
> > >
> > > 1. Producers start partitioning based on the decreased partition count.
> > > 2. Consumers continue to consume from all partitions.
> > > 3. At some point all the records in the old partitions have expired and
> > > they can be deleted.
> > >
> > > This wouldn't work for compacted topics. Of course a more aggressive
> > > strategy is also possible where we forcibly move data from the
> partitions
> > > to be deleted.
> > >
> > > Anyway, in either case the process would be long running, whereas the
> > > increase case is fast, so the semantics are quite different. So I
> agree,
> > > lets rename the method to make clear that it's only for increasing the
> > > partition count.
> > >
> > > >
> > > > > Outstanding questions:
> > > > >
> > > > > 1. Is the proposed alterInterReplicaThrottle() API really better
> than
> > > > > changing the throttle via alterConfigs()?
> > > >
> > > > That's a good point.  I would argue that we should just use
> > alterConfigs
> > > > to set the broker configuration, rather than having a special RPC
> just
> > > > for this.
> > > >
> > >
> > > Yes, I'm minded to agree.
> > >
> > > The reason I originally thought a special API might be better was if
> > > people
> > > felt that the DynamicConfig mechanism (which seems to exist only to
> > > support
> > > these throttles) was an implementation detail of the throttles. But I
> now
> > > realise that they're visible via kafka-topics.sh, so they're
> effectively
> > > already a public API.
> > >
> > >
> > > >
> > > > ...
> > > > > Would it be a problem that
> > > > > triggering the reassignment required ClusterAction on the Cluster,
> > but
> > > > > throttling the assignment required Alter on the Topic? What if a
> > user had
> > > > > the former permission, but not the latter?
> > > >
> > > > We've been trying to reserve ClusterAction on Cluster for
> > > > broker-initiated operations.  Alter on Cluster is the ACL for "root
> > > > stuff" and I would argue that it should be what we use here.
> > > >
> > > > For reconfiguring the broker, I think we should follow KIP-133 and
> use
> > > > AlterConfigs on the Broker resource.  (Of course, if you just use the
> > > > existing alterConfigs call, you get this without any special effort.)
> > > >
> > >
> > > Yes, sorry, what I put in the email about authorisation wasn't what I
> put
> > > in the KIP (I revised the KIP after drafting the email and then forgot
> to
> > > update the email).
> > >
> > > Although KIP-133 proposes a Broker resource, I don't see one in the
> code
> > > and KIP-133 was supposedly delivered in 0.11. Can anyone fill in the
> > > story
> > > here? Is it simply because the functionality to update broker configs
> > > hasn't been implemented yet?
> >
> > Look in
> > ./clients/src/main/java/org/apache/kafka/common/config/
> > ConfigResource.java,
> > for the BROKER resource.  I bet you're looking at the Resource class
> > used for ACLs, which is a different class.
> >
> > >
> > > As currently proposed, both reassignPartitions() and
> > > alterInterBrokerThrottle()
> > > require Alter on the Cluster. If we used alterConfigs() to set the
> > > throttles then we create a situation where the triggering of the
> > > reassignment required Alter(Cluster), but the throttling required
> > > Alter(Broker), and the user might have the former but not the latter. I
> > > don't think this is likely to be a big deal in practice, but maybe
> others
> > > disagree?
> >
> > Alter:Cluster is essentially root, though.  If you have Alter:Cluster
> > and you don't have AlterConfigs:Broker, you can just create a new ACL
> > giving it to yourself (creating and deleting ACLs is one of the powers
> > of Alter:Cluster)
> >
> > cheers,
> > Colin
> >
> > >
> > >
> > > > >
> > > > > 2. Is reassignPartitions() really the best name? I find the
> > distinction
> > > > > between reassigning and just assigning to be of limited value, and
> > > > > "reassign" is misleading when the APIs now used for changing the
> > > > > replication factor too. Since the API is asynchronous/long running,
> > it
> > > > > might be better to indicate that in the name some how. What about
> > > > > startPartitionAssignment()?
> > > >
> > > > Good idea -- I like the idea of using "start" or "initiate" to
> indicate
> > > > that this is kicking off a long-running operation.
> > > >
> > > > "reassign" seemed like a natural choice to me since this is changing
> an
> > > > existing assignment.  It was assigned (when the topic it was
> created)--
> > > > now it's being re-assigned.  If you change it to just "assign" then
> it
> > > > feels confusing to me.  A new user might ask if "assigning
> partitions"
> > > > is a step that they should take after creating a topic, similar to
> how
> > > > subscribing to topics is a step you take after creating a consumer.
> > > >
> > >
> > > Yeah, I find this new user argument persuasive, so I'm happy to stick
> > > with
> > > reassign.
> > >
> > > Thanks again for the feedback,
> > >
> > > Tom
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Ismael Juma <is...@juma.me.uk>.
Hi Tom,

What do you think of moving `increasePartitionsCount` (or
`increaseNumPartitions`) to a separate KIP? That is simple enough that we
could potentially include it in 1.0.0. I'd be happy to review it.
ReassignPartitions is more complex and we can probably aim to include that
in the January release. What do you think?

Ismael

On Wed, Sep 6, 2017 at 11:42 PM, Colin McCabe <cm...@apache.org> wrote:

> On Wed, Sep 6, 2017, at 00:20, Tom Bentley wrote:
> > Hi Ted and Colin,
> >
> > Thanks for the comments.
> >
> > It seems you're both happier with reassign rather than assign, so I'm
> > happy
> > to stick with that.
> >
> >
> > On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org> wrote:
> >
> > > ...
> >
> >
> > > Do we expect that reducing the number of partitions will ever be
> > > supported by this API?  It seems like decreasing would require a
> > > different API-- one which supported data movement, had a "check status
> > > of this operation" feature, etc. etc.  If this API is only ever going
> to
> > > be used to increase the number of partitions, I think we should just
> > > call it "increasePartitionCount" to avoid confusion.
> > >
> >
> > I thought a little about the decrease possibility (hence the static
> > factory
> > methods on PartitionCount), but not really in enough detail. I suppose a
> > decrease process could look like this:
> >
> > 1. Producers start partitioning based on the decreased partition count.
> > 2. Consumers continue to consume from all partitions.
> > 3. At some point all the records in the old partitions have expired and
> > they can be deleted.
> >
> > This wouldn't work for compacted topics. Of course a more aggressive
> > strategy is also possible where we forcibly move data from the partitions
> > to be deleted.
> >
> > Anyway, in either case the process would be long running, whereas the
> > increase case is fast, so the semantics are quite different. So I agree,
> > lets rename the method to make clear that it's only for increasing the
> > partition count.
> >
> > >
> > > > Outstanding questions:
> > > >
> > > > 1. Is the proposed alterInterReplicaThrottle() API really better than
> > > > changing the throttle via alterConfigs()?
> > >
> > > That's a good point.  I would argue that we should just use
> alterConfigs
> > > to set the broker configuration, rather than having a special RPC just
> > > for this.
> > >
> >
> > Yes, I'm minded to agree.
> >
> > The reason I originally thought a special API might be better was if
> > people
> > felt that the DynamicConfig mechanism (which seems to exist only to
> > support
> > these throttles) was an implementation detail of the throttles. But I now
> > realise that they're visible via kafka-topics.sh, so they're effectively
> > already a public API.
> >
> >
> > >
> > > ...
> > > > Would it be a problem that
> > > > triggering the reassignment required ClusterAction on the Cluster,
> but
> > > > throttling the assignment required Alter on the Topic? What if a
> user had
> > > > the former permission, but not the latter?
> > >
> > > We've been trying to reserve ClusterAction on Cluster for
> > > broker-initiated operations.  Alter on Cluster is the ACL for "root
> > > stuff" and I would argue that it should be what we use here.
> > >
> > > For reconfiguring the broker, I think we should follow KIP-133 and use
> > > AlterConfigs on the Broker resource.  (Of course, if you just use the
> > > existing alterConfigs call, you get this without any special effort.)
> > >
> >
> > Yes, sorry, what I put in the email about authorisation wasn't what I put
> > in the KIP (I revised the KIP after drafting the email and then forgot to
> > update the email).
> >
> > Although KIP-133 proposes a Broker resource, I don't see one in the code
> > and KIP-133 was supposedly delivered in 0.11. Can anyone fill in the
> > story
> > here? Is it simply because the functionality to update broker configs
> > hasn't been implemented yet?
>
> Look in
> ./clients/src/main/java/org/apache/kafka/common/config/
> ConfigResource.java,
> for the BROKER resource.  I bet you're looking at the Resource class
> used for ACLs, which is a different class.
>
> >
> > As currently proposed, both reassignPartitions() and
> > alterInterBrokerThrottle()
> > require Alter on the Cluster. If we used alterConfigs() to set the
> > throttles then we create a situation where the triggering of the
> > reassignment required Alter(Cluster), but the throttling required
> > Alter(Broker), and the user might have the former but not the latter. I
> > don't think this is likely to be a big deal in practice, but maybe others
> > disagree?
>
> Alter:Cluster is essentially root, though.  If you have Alter:Cluster
> and you don't have AlterConfigs:Broker, you can just create a new ACL
> giving it to yourself (creating and deleting ACLs is one of the powers
> of Alter:Cluster)
>
> cheers,
> Colin
>
> >
> >
> > > >
> > > > 2. Is reassignPartitions() really the best name? I find the
> distinction
> > > > between reassigning and just assigning to be of limited value, and
> > > > "reassign" is misleading when the APIs now used for changing the
> > > > replication factor too. Since the API is asynchronous/long running,
> it
> > > > might be better to indicate that in the name some how. What about
> > > > startPartitionAssignment()?
> > >
> > > Good idea -- I like the idea of using "start" or "initiate" to indicate
> > > that this is kicking off a long-running operation.
> > >
> > > "reassign" seemed like a natural choice to me since this is changing an
> > > existing assignment.  It was assigned (when the topic it was created)--
> > > now it's being re-assigned.  If you change it to just "assign" then it
> > > feels confusing to me.  A new user might ask if "assigning partitions"
> > > is a step that they should take after creating a topic, similar to how
> > > subscribing to topics is a step you take after creating a consumer.
> > >
> >
> > Yeah, I find this new user argument persuasive, so I'm happy to stick
> > with
> > reassign.
> >
> > Thanks again for the feedback,
> >
> > Tom
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Colin McCabe <cm...@apache.org>.
On Wed, Sep 6, 2017, at 00:20, Tom Bentley wrote:
> Hi Ted and Colin,
> 
> Thanks for the comments.
> 
> It seems you're both happier with reassign rather than assign, so I'm
> happy
> to stick with that.
> 
> 
> On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org> wrote:
> 
> > ...
> 
> 
> > Do we expect that reducing the number of partitions will ever be
> > supported by this API?  It seems like decreasing would require a
> > different API-- one which supported data movement, had a "check status
> > of this operation" feature, etc. etc.  If this API is only ever going to
> > be used to increase the number of partitions, I think we should just
> > call it "increasePartitionCount" to avoid confusion.
> >
> 
> I thought a little about the decrease possibility (hence the static
> factory
> methods on PartitionCount), but not really in enough detail. I suppose a
> decrease process could look like this:
> 
> 1. Producers start partitioning based on the decreased partition count.
> 2. Consumers continue to consume from all partitions.
> 3. At some point all the records in the old partitions have expired and
> they can be deleted.
> 
> This wouldn't work for compacted topics. Of course a more aggressive
> strategy is also possible where we forcibly move data from the partitions
> to be deleted.
> 
> Anyway, in either case the process would be long running, whereas the
> increase case is fast, so the semantics are quite different. So I agree,
> lets rename the method to make clear that it's only for increasing the
> partition count.
> 
> >
> > > Outstanding questions:
> > >
> > > 1. Is the proposed alterInterReplicaThrottle() API really better than
> > > changing the throttle via alterConfigs()?
> >
> > That's a good point.  I would argue that we should just use alterConfigs
> > to set the broker configuration, rather than having a special RPC just
> > for this.
> >
> 
> Yes, I'm minded to agree.
> 
> The reason I originally thought a special API might be better was if
> people
> felt that the DynamicConfig mechanism (which seems to exist only to
> support
> these throttles) was an implementation detail of the throttles. But I now
> realise that they're visible via kafka-topics.sh, so they're effectively
> already a public API.
> 
> 
> >
> > ...
> > > Would it be a problem that
> > > triggering the reassignment required ClusterAction on the Cluster, but
> > > throttling the assignment required Alter on the Topic? What if a user had
> > > the former permission, but not the latter?
> >
> > We've been trying to reserve ClusterAction on Cluster for
> > broker-initiated operations.  Alter on Cluster is the ACL for "root
> > stuff" and I would argue that it should be what we use here.
> >
> > For reconfiguring the broker, I think we should follow KIP-133 and use
> > AlterConfigs on the Broker resource.  (Of course, if you just use the
> > existing alterConfigs call, you get this without any special effort.)
> >
> 
> Yes, sorry, what I put in the email about authorisation wasn't what I put
> in the KIP (I revised the KIP after drafting the email and then forgot to
> update the email).
> 
> Although KIP-133 proposes a Broker resource, I don't see one in the code
> and KIP-133 was supposedly delivered in 0.11. Can anyone fill in the
> story
> here? Is it simply because the functionality to update broker configs
> hasn't been implemented yet?

Look in
./clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java,
for the BROKER resource.  I bet you're looking at the Resource class
used for ACLs, which is a different class.

> 
> As currently proposed, both reassignPartitions() and
> alterInterBrokerThrottle()
> require Alter on the Cluster. If we used alterConfigs() to set the
> throttles then we create a situation where the triggering of the
> reassignment required Alter(Cluster), but the throttling required
> Alter(Broker), and the user might have the former but not the latter. I
> don't think this is likely to be a big deal in practice, but maybe others
> disagree?

Alter:Cluster is essentially root, though.  If you have Alter:Cluster
and you don't have AlterConfigs:Broker, you can just create a new ACL
giving it to yourself (creating and deleting ACLs is one of the powers
of Alter:Cluster)

cheers,
Colin

> 
> 
> > >
> > > 2. Is reassignPartitions() really the best name? I find the distinction
> > > between reassigning and just assigning to be of limited value, and
> > > "reassign" is misleading when the APIs now used for changing the
> > > replication factor too. Since the API is asynchronous/long running, it
> > > might be better to indicate that in the name some how. What about
> > > startPartitionAssignment()?
> >
> > Good idea -- I like the idea of using "start" or "initiate" to indicate
> > that this is kicking off a long-running operation.
> >
> > "reassign" seemed like a natural choice to me since this is changing an
> > existing assignment.  It was assigned (when the topic it was created)--
> > now it's being re-assigned.  If you change it to just "assign" then it
> > feels confusing to me.  A new user might ask if "assigning partitions"
> > is a step that they should take after creating a topic, similar to how
> > subscribing to topics is a step you take after creating a consumer.
> >
> 
> Yeah, I find this new user argument persuasive, so I'm happy to stick
> with
> reassign.
> 
> Thanks again for the feedback,
> 
> Tom

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi Ted and Colin,

Thanks for the comments.

It seems you're both happier with reassign rather than assign, so I'm happy
to stick with that.


On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org> wrote:

> ...


> Do we expect that reducing the number of partitions will ever be
> supported by this API?  It seems like decreasing would require a
> different API-- one which supported data movement, had a "check status
> of this operation" feature, etc. etc.  If this API is only ever going to
> be used to increase the number of partitions, I think we should just
> call it "increasePartitionCount" to avoid confusion.
>

I thought a little about the decrease possibility (hence the static factory
methods on PartitionCount), but not really in enough detail. I suppose a
decrease process could look like this:

1. Producers start partitioning based on the decreased partition count.
2. Consumers continue to consume from all partitions.
3. At some point all the records in the old partitions have expired and
they can be deleted.

This wouldn't work for compacted topics. Of course a more aggressive
strategy is also possible where we forcibly move data from the partitions
to be deleted.

Anyway, in either case the process would be long running, whereas the
increase case is fast, so the semantics are quite different. So I agree,
lets rename the method to make clear that it's only for increasing the
partition count.

>
> > Outstanding questions:
> >
> > 1. Is the proposed alterInterReplicaThrottle() API really better than
> > changing the throttle via alterConfigs()?
>
> That's a good point.  I would argue that we should just use alterConfigs
> to set the broker configuration, rather than having a special RPC just
> for this.
>

Yes, I'm minded to agree.

The reason I originally thought a special API might be better was if people
felt that the DynamicConfig mechanism (which seems to exist only to support
these throttles) was an implementation detail of the throttles. But I now
realise that they're visible via kafka-topics.sh, so they're effectively
already a public API.


>
> ...
> > Would it be a problem that
> > triggering the reassignment required ClusterAction on the Cluster, but
> > throttling the assignment required Alter on the Topic? What if a user had
> > the former permission, but not the latter?
>
> We've been trying to reserve ClusterAction on Cluster for
> broker-initiated operations.  Alter on Cluster is the ACL for "root
> stuff" and I would argue that it should be what we use here.
>
> For reconfiguring the broker, I think we should follow KIP-133 and use
> AlterConfigs on the Broker resource.  (Of course, if you just use the
> existing alterConfigs call, you get this without any special effort.)
>

Yes, sorry, what I put in the email about authorisation wasn't what I put
in the KIP (I revised the KIP after drafting the email and then forgot to
update the email).

Although KIP-133 proposes a Broker resource, I don't see one in the code
and KIP-133 was supposedly delivered in 0.11. Can anyone fill in the story
here? Is it simply because the functionality to update broker configs
hasn't been implemented yet?

As currently proposed, both reassignPartitions() and alterInterBrokerThrottle()
require Alter on the Cluster. If we used alterConfigs() to set the
throttles then we create a situation where the triggering of the
reassignment required Alter(Cluster), but the throttling required
Alter(Broker), and the user might have the former but not the latter. I
don't think this is likely to be a big deal in practice, but maybe others
disagree?


> >
> > 2. Is reassignPartitions() really the best name? I find the distinction
> > between reassigning and just assigning to be of limited value, and
> > "reassign" is misleading when the APIs now used for changing the
> > replication factor too. Since the API is asynchronous/long running, it
> > might be better to indicate that in the name some how. What about
> > startPartitionAssignment()?
>
> Good idea -- I like the idea of using "start" or "initiate" to indicate
> that this is kicking off a long-running operation.
>
> "reassign" seemed like a natural choice to me since this is changing an
> existing assignment.  It was assigned (when the topic it was created)--
> now it's being re-assigned.  If you change it to just "assign" then it
> feels confusing to me.  A new user might ask if "assigning partitions"
> is a step that they should take after creating a topic, similar to how
> subscribing to topics is a step you take after creating a consumer.
>

Yeah, I find this new user argument persuasive, so I'm happy to stick with
reassign.

Thanks again for the feedback,

Tom

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Colin McCabe <cm...@apache.org>.
On Tue, Sep 5, 2017, at 09:39, Tom Bentley wrote:
> I've revised this KIP again:
> 
> * Change the alterPartitionCounts() API to support passing an optional
> assignment for the new partitions (which is already supported by
> kafka-topics.sh). At the same time I didn't want the API to suggest it
> was
> possible to change the existing assignments in the same call (which isn't
> supported today).
> 
> * Removed alterReplicationFactor(), since it's really just a special case
> of reassignPartitions(): In both cases it is necessary to give a complete
> partition assignment and both can be long running due to data movement.

Thanks, Tom.

Do we expect that reducing the number of partitions will ever be
supported by this API?  It seems like decreasing would require a
different API-- one which supported data movement, had a "check status
of this operation" feature, etc. etc.  If this API is only ever going to
be used to increase the number of partitions, I think we should just
call it "increasePartitionCount" to avoid confusion.

>
> Outstanding questions:
> 
> 1. Is the proposed alterInterReplicaThrottle() API really better than
> changing the throttle via alterConfigs()?

That's a good point.  I would argue that we should just use alterConfigs
to set the broker configuration, rather than having a special RPC just
for this.  

> It wouldn't be necessary to
> support alterConfigs() for all broker configs, just DynamicConfigs (which
> can't be set via the config file any way).

Hmm.  For this purpose, it's not even necessary to support alterConfigs
for all dynamic configurations.  You could just support alterConfigs on
this particular dynamic configuration key.  It might be better to be
conservative here, since exposing all of our internals could lead to
compatibility problems later on.

> Would it be a problem that
> triggering the reassignment required ClusterAction on the Cluster, but
> throttling the assignment required Alter on the Topic? What if a user had
> the former permission, but not the latter?

We've been trying to reserve ClusterAction on Cluster for
broker-initiated operations.  Alter on Cluster is the ACL for "root
stuff" and I would argue that it should be what we use here.

For reconfiguring the broker, I think we should follow KIP-133 and use
AlterConfigs on the Broker resource.  (Of course, if you just use the
existing alterConfigs call, you get this without any special effort.)

> 
> 2. Is reassignPartitions() really the best name? I find the distinction
> between reassigning and just assigning to be of limited value, and
> "reassign" is misleading when the APIs now used for changing the
> replication factor too. Since the API is asynchronous/long running, it
> might be better to indicate that in the name some how. What about
> startPartitionAssignment()?

Good idea -- I like the idea of using "start" or "initiate" to indicate
that this is kicking off a long-running operation.

"reassign" seemed like a natural choice to me since this is changing an
existing assignment.  It was assigned (when the topic it was created)--
now it's being re-assigned.  If you change it to just "assign" then it
feels confusing to me.  A new user might ask if "assigning partitions"
is a step that they should take after creating a topic, similar to how
subscribing to topics is a step you take after creating a consumer.

Basically, "reassign" makes it clear that it is changing an assignment
that already exists.  "assign" leaves open the possibility that no
assignment exists (which of course we know is not true, but it would be
nice if the name reflected that.)

> On 30 August 2017 at 16:17, Tom Bentley <t....@gmail.com> wrote:
> 
> > Hi all,
> >
> > I've updated the KIP as follows:
> >
> > * remove the APIs supporting progress reporting in favour of the APIs
> > being implemented in KIP-113.
> > * added some APIs to cover the existing functionality around throttling
> > inter-broker transfers, which was previously a TODO.
> >
> > To respond to Colin's suggestion:
> >
> > > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> > >                                   Map<String, TopicAlteration> alters)
> > >
> > > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> > >
> > > ReplicationFactorAlteration(int repl) implements TopicAlteration
> > >
> > > ReassignPartitionsAlteration(...) implements TopicAlteration
> >
> > That is a workable alternative to providing 3 separate methods on the
> > AdminClient, but I don't see why it is objectively better. I don't see
> > clients commonly needing to do a mixture of alterations, and it assumes
> > that the options make sense for all alterations.

Yeah-- it was just an idea I had for an alternate API that grouped
things a little bit more.  I agree that it does constrain us to using
only options that make sense for all the operations, which is not ideal.
 So your existing split into separate functions probably makes more
sense.

cheers,
Colin

> >
> > Cheers,
> >
> > Tom
> >
> > On 22 August 2017 at 23:49, Colin McCabe <cm...@apache.org> wrote:
> >
> >> On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
> >> > Hi Dong and Jun,
> >> >
> >> > Thanks again for your input in this discussion and on KIP-113. It's
> >> > difficult that discussion is split between this thread and the one for
> >> > KIP-113, but I'll try to respond on this thread to questions asked on
> >> > this
> >> > thread.
> >> >
> >> > It seems there is some consensus that the alterTopic() API is the wrong
> >> > thing, and it would make more sense to separate the different kinds of
> >> > alteration into separate APIs. It seems to me there is then a choice.
> >> >
> >> > 1. Have separate alterPartitionCount(), alterReplicationFactor() and
> >> > reassignPartitions() methods. This would most closely match the
> >> > facilities
> >> > currently offered by kafka-alter-topics and kafka-reassign-partitions.
> >> > 2. Just provide a reassignPartitions() method and infer from the shape
> >> of
> >> > the data passed to that that a change in replication factor and/or
> >> > partition count is required, as suggested by Dong.
> >> >
> >> > The choice we make here is also relevant to KIP-113 and KIP-178. By
> >> > choosing (1) we can change the replication factor or partition count
> >> > without providing an assignment and therefore are necessarily requiring
> >> > the
> >> > controller to make a decision for us about which broker (and, for 113,
> >> > which log directory and thus which disk) the replica should be reside.
> >> > There is then the matter of what criteria the controller should use to
> >> > make
> >> > that decision (a decision is also required on topic creation of course,
> >> > but
> >> > I'll try not to go there right now).
> >>
> >> Hmm.  One approach would be to have something like
> >>
> >> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> >> >                                   Map<String, TopicAlteration> alters)
> >> >
> >> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> >> >
> >> > ReplicationFactorAlteration(int repl) implements TopicAlteration
> >> >
> >> > ReassignPartitionsAlteration(...) implements TopicAlteration
> >>
> >>
> >> >
> >> > Choosing (2), on the other hand, forces us to make an assignment, and
> >> > currently the AdminClient doesn't provide the APIs necessary to make a
> >> > very
> >> > informed decision. To do the job properly we'd need APIs to enumerate
> >> the
> >> > permissible log directories on each broker, know the current disk usage
> >> > etc. These just don't exist today, and I think it would be a lot of work
> >> > to
> >> > bite off to specify them. We've already got three KIPs on the go.
> >> >
> >> > So, for the moment, I think we have to choose 1, for pragmatic reasons.
> >> > There is nothing to stop us deprecating alterPartitionCount() and
> >> > alterReplicationFactor() and moving to (2) at a later date.
> >>
> >> Yeah, the API should be pragmatic.  An API that doesn't let us do what
> >> we want to do (like let Kafka assign us a new replica on the least
> >> loaded node) is not good.
> >>
> >> >
> >> > To answer a specific questions of Jun's:
> >> >
> >> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> >> > > means.
> >> >
> >> >
> >> > It corresponds to the ReplicaStatus.statusTime, and the idea was it was
> >> > the
> >> > epoch offset at which the controllers notion of the lag was current
> >> > (which
> >> > I think is the epoch offset of the last FetchRequest from the follower).
> >> > At
> >> > one point I was considering some other ways of determining progress and
> >> > completion, and TBH I think it was more pertinent to those rejected
> >> > alternatives.
> >> >
> >> > I've already made some changes to KIP-179. As suggested in the thread
> >> for
> >> > KIP-113, I will be changing some more since I think we can
> >> > DescribeDirsRequest
> >> > instead of ReplicaStatusResponse to implement the --progress option.
> >>
> >> That makes sense.  Whatever response is used, it would be nice to have
> >> an error message in addition to an error code.
> >>
> >> best,
> >> Colin
> >>
> >> >
> >> > Cheers,
> >> >
> >> > Tom
> >> >
> >> >
> >> >
> >> > On 9 August 2017 at 02:28, Jun Rao <ju...@confluent.io> wrote:
> >> >
> >> > > Hi, Tom,
> >> > >
> >> > > Thanks for the KIP. A few minor comments below.
> >> > >
> >> > > 1. Implementation wise, the broker handles adding partitions
> >> differently
> >> > > from changing replica assignment. For the former, we directly update
> >> the
> >> > > topic path in ZK with the new partitions. For the latter, we write
> >> the new
> >> > > partition reassignment in the partition reassignment path. Changing
> >> the
> >> > > replication factor is handled in the same way as changing replica
> >> > > assignment. So, it would be useful to document how the broker handles
> >> these
> >> > > different cases accordingly. I think it's simpler to just allow one
> >> change
> >> > > (partition, replication factor, or replica assignment) in a request.
> >> > >
> >> > > 2. Currently, we only allow adding partitions. We probably want to
> >> document
> >> > > the restriction in the api.
> >> > >
> >> > > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> >> > > means.
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > >
> >> > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <li...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hey Tom,
> >> > > >
> >> > > > Thanks for your reply. Here are my thoughts:
> >> > > >
> >> > > > 1) I think the DescribeDirsResponse can be used by AdminClient to
> >> query
> >> > > the
> >> > > > lag of follower replica as well. Here is how it works:
> >> > > >
> >> > > > - AdminClient sends DescribeDirsRequest to both the leader and the
> >> > > follower
> >> > > > of the partition.
> >> > > > - DescribeDirsResponse from both leader and follower shows the LEO
> >> of the
> >> > > > leader replica and the follower replica.
> >> > > > - Lag of the follower replica is the difference in the LEO between
> >> leader
> >> > > > and follower.
> >> > > >
> >> > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to
> >> be
> >> > > send
> >> > > > to each replica of the partition whereas ReplicaStatusRequest only
> >> needs
> >> > > to
> >> > > > be sent to the leader of the partition. It doesn't seem to make much
> >> > > > difference though. In practice we probably want to query the
> >> replica lag
> >> > > of
> >> > > > many partitions and AminClient needs to send exactly one request to
> >> each
> >> > > > broker with either solution. Does this make sense?
> >> > > >
> >> > > >
> >> > > > 2) KIP-179 proposes to add the following AdminClient API:
> >> > > >
> >> > > > alterTopics(Collection<AlteredTopic> alteredTopics,
> >> AlterTopicsOptions
> >> > > > options)
> >> > > >
> >> > > > Where AlteredTopic includes the following fields
> >> > > >
> >> > > > AlteredTopic(String name, int numPartitions, int replicationFactor,
> >> > > > Map<Integer,List<Integer>> replicasAssignment)
> >> > > >
> >> > > > I have two comments on this:
> >> > > >
> >> > > > - The information in AlteredTopic seems a bit redundant. Both
> >> > > numPartitions
> >> > > > and replicationFactor can be derived from replicasAssignment. I
> >> think we
> >> > > > can probably just use the following API in AdminClient instead:
> >> > > >
> >> > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> >> > > > partitionAssignment, AlterTopicsOptions options)
> >> > > >
> >> > > > - Do you think "reassignPartitions" may be a better name than
> >> > > > "alterTopics"? This is more consistent with the existing name used
> >> in
> >> > > > kafka-reassign-partitions.sh and I also find it to be more
> >> accurate. On
> >> > > the
> >> > > > other hand, alterTopics seems to suggest that we can also alter
> >> topic
> >> > > > config.
> >> > > >
> >> > > >
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t....@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi Dong,
> >> > > > >
> >> > > > > Thanks for your reply.
> >> > > > >
> >> > > > > You're right that your DescribeDirsResponse contains appropriate
> >> data.
> >> > > > The
> >> > > > > comment about the log_end_offset in the KIP says "Enable user to
> >> track
> >> > > > > movement progress by comparing LEO of the *.log and *.move". That
> >> makes
> >> > > > me
> >> > > > > wonder whether this would only work for replica movement on the
> >> same
> >> > > > > broker. In the case of reassigning partitions between brokers
> >> it's only
> >> > > > > really the leader for the partition that knows the max LEO of the
> >> ISR
> >> > > and
> >> > > > > the LEO of the target broker. Maybe the comment is misleading and
> >> it
> >> > > > would
> >> > > > > be the leader doing the same thing in both cases. Can you clarify
> >> this?
> >> > > >
> >> > > >
> >> > > > > At a conceptual level there's a lot of similarity between KIP-113
> >> and
> >> > > > > KIP-179 because they're both about moving replicas around.
> >> Concretely
> >> > > > > though, ChangeReplicaDirRequest assumes the movement is on the
> >> same
> >> > > > broker,
> >> > > > > while the equivalent APIs in KIP-179 (whichever alternative) lack
> >> the
> >> > > > > notion of disks. I wonder if a unified API would be better or not.
> >> > > > > Operationally, the reasons for changing replicas between brokers
> >> are
> >> > > > likely
> >> > > > > to be motivated by balancing load across the cluster, or
> >> > > adding/removing
> >> > > > > brokers. But the JBOD use case has different motivations. So I
> >> suspect
> >> > > > it's
> >> > > > > OK that the APIs are separate, but I'd love to hear what others
> >> think.
> >> > > >
> >> > > >
> >> > > > > I think you're right about TopicPartitionReplica. At the protocol
> >> level
> >> > > > we
> >> > > > > could group topics and partitions together so avoid having the
> >> same
> >> > > topic
> >> > > > > name multiple times when querying for the status of all the
> >> partitions
> >> > > > of a
> >> > > > > topic.
> >> > > > >
> >> > > > > Thanks again for taking the time to respond.
> >> > > > >
> >> > > > > Tom
> >> > > > >
> >> > > > > On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com> wrote:
> >> > > > >
> >> > > > > > Hey Tom,
> >> > > > > >
> >> > > > > > Thanks for the KIP. It seems that the prior discussion in this
> >> thread
> >> > > > has
> >> > > > > > focused on reassigning partitions (or AlterTopics). I haven't
> >> looked
> >> > > > into
> >> > > > > > this yet. I have two comments on the replicaStatus() API and the
> >> > > > > > ReplicaStatusRequest:
> >> > > > > >
> >> > > > > > -  It seems that the use-case for ReplicaStatusRequest is
> >> covered by
> >> > > > > > the DescribeDirsRequest introduced in KIP-113
> >> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > > > > 113%3A+Support+replicas+movement+between+log+directories>.
> >> > > > > > DescribeDirsResponse contains the logEndOffset for the specified
> >> > > > > > partitions. The the lag of the follower replica can be derived
> >> as the
> >> > > > > > difference between leader's LEO and follower's LEO. Do you
> >> think we
> >> > > can
> >> > > > > > simply use DescribeDirsRequest without introducing
> >> > > > ReplicaStatusRequest?
> >> > > > > >
> >> > > > > > - According to the current KIP, replicaStatus() takes a list of
> >> > > > > partitions
> >> > > > > > as input. And its output maps the partition to a list of replica
> >> > > > status.
> >> > > > > Do
> >> > > > > > you think it would be better to have a new class called
> >> > > > > > TopicPartitionReplica, which identifies the topic, partition
> >> and the
> >> > > > > > brokerId. replicaStatus can then take a list of
> >> TopicPartitionReplica
> >> > > > as
> >> > > > > > input. And its output maps the replica to replica status. The
> >> latter
> >> > > > API
> >> > > > > > seems simpler and also matches the method name better. What do
> >> you
> >> > > > think?
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Dong
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <
> >> t.j.bentley@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi again Ismael,
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > 1. It's worth emphasising that reassigning partitions is a
> >> > > different
> >> > > > > > > >> process than what happens when a topic is created, so not
> >> sure
> >> > > > > trying
> >> > > > > > to
> >> > > > > > > >> make it symmetric is beneficial. In addition to what was
> >> already
> >> > > > > > > >> discussed,
> >> > > > > > > >> one should also enable replication throttling before
> >> moving the
> >> > > > > data.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Thanks for your responses.The only advantage I can see from
> >> > > having
> >> > > > > > > > separate APIs for partition count change vs. the other
> >> changes is
> >> > > > > that
> >> > > > > > we
> >> > > > > > > > expect the former to return synchronously, and the latter to
> >> > > > > (usually)
> >> > > > > > > > return asynchronously. Lumping them into the same API forces
> >> > > > clients
> >> > > > > of
> >> > > > > > > > that API to code for the two possible cases. Is this the
> >> > > particular
> >> > > > > > > concern
> >> > > > > > > > you had in mind, or do you have others?
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > Just to help see what this looks like I've created another
> >> version
> >> > > of
> >> > > > > > > KIP-179 (
> >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> >> > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+
> >> AdminClient
> >> > > > > > > ).
> >> > > > > > >
> >> > > > > > > The AdminClient APIs are more type safe (harder to misuse).
> >> The
> >> > > > > > > asynchronous nature of the alterReplicationFactors() and
> >> > > > > > > reassignPartitions() methods aren't really apparent (you'd
> >> still
> >> > > have
> >> > > > > to
> >> > > > > > > read the javadocs to know you had to call replicaStatus() to
> >> > > > determine
> >> > > > > > > completion), but it's still better that the lumped-together
> >> API
> >> > > > because
> >> > > > > > the
> >> > > > > > > methods are _consistently_ async. The Protocol APIs are
> >> slightly
> >> > > > nicer
> >> > > > > > too,
> >> > > > > > > in that INVALID_REQUEST is less commonly returned. Another
> >> benefit
> >> > > is
> >> > > > > it
> >> > > > > > > would allow for these different alteration APIs to be modified
> >> > > > > > > independently in the future, should that necessary.
> >> > > > > > >
> >> > > > > > > What do others think?
> >> > > > > > >
> >> > > > > > > You're right that the proposal doesn't cover setting and
> >> clearing a
> >> > > > > > > > throttle, and it should. I will study the code about how
> >> this
> >> > > works
> >> > > > > > > before
> >> > > > > > > > posting back about that.
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > AFAICS from the code the throttle is simply a dynamic config
> >> of the
> >> > > > > > broker.
> >> > > > > > > We already have the AdminClient.alterConfigs API for setting
> >> this,
> >> > > so
> >> > > > > the
> >> > > > > > > refactoring of kafka-reassign-partitions.sh could just use
> >> that
> >> > > > > existing
> >> > > > > > > API. We could still achieve your objective of setting the
> >> throttle
> >> > > > > before
> >> > > > > > > reassignment by making the --throttle option mandatory when
> >> > > --execute
> >> > > > > was
> >> > > > > > > present. Or did you have something else in mind?
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > >
> >> > > > > > > Tom
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> >
> >