You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Tom Bentley <t....@gmail.com> on 2017/08/01 09:19:22 UTC

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

I have added the timeout I mentioned before, because it makes the
implementation of topic alteration more symmetric with the topic creation
APIs.

I have also added a section ("Policy") on retrofitting the
CreateTopicPolicy's rules to topic alteration requests, and made a few
other minor fixes.

I've still not factored out the topic name from the ReplicaStatusRequest
(Ismael, do you have an opinion about that?)

If any one else has some feedback on this KIP that would be great.
Otherwise, I would like to start a vote on this KIP before the end of the
week.

Thanks,

Tom

On 26 July 2017 at 11:45, Tom Bentley <t....@gmail.com> wrote:

> I've updated the KIP to fix those niggles, but I've not factored out the
> topic name from the ReplicaStatusRequest, yet.
>
> Looking at the topic creation APIs in more detail, the CreateTopicsOptions
> has
>
> * `shouldValidateOnly()`, which would make a lot of sense for the alter
> topic APIs
> * `timeoutMs()`, which I'm not sure sure about...
>
> Topic creation doesn't require shifting replicas between brokers so it's
> reasonable to support timeout, because we don't expect it to take very long.
>
> Topic alteration usually takes a while because we are going to have to
> move replicas. Since we're adding a whole API to track the progress of that
> replication, I'm inclined to think that having a timeout is a bit pointless.
>
> But should the replicaStatus() API have a timeout? I suppose it probably
> should.
>
>
> On 26 July 2017 at 10:58, Tom Bentley <t....@gmail.com> wrote:
>
>> Thanks Paolo,
>>
>>   *   in the "Public Interfaces" section you wrote
>>> alterTopics(Set<AlterTopics>) but then a collection is used (instead of a
>>> set) in the Proposed Changes section. I'm ok with collection.
>>>
>>
>> Agree it should be Collection.
>>
>>
>>>   *   in the summary of the alterTopics method you say "The request can
>>> change the number of partitions, replication factor and/or the partition
>>> assignments." I think that the "and/or" is misleading (at least for me).
>>> For the TopicCommand tool you can specify partitions AND replication factor
>>> ... OR partition assignments (but not for example partitions AND
>>> replication factor AND partition assignments). Maybe it could be "The
>>> request can change the number of partitions and the related replication
>>> factor or specifying a partition assignments."
>>>
>>
>> Is there a reason why we can't support changing all three at once? It
>> certainly makes conceptual sense to, say, increase the number of partitions
>> and replication factor, and specify how you want the partitions assigned.
>> And doing two separate calls would be less efficient as we sync new
>> replicas on brokers only to then move them somewhere else.
>>
>> If there is a reason we don't want to support changing all three, then we
>> can return the error INVALID_REQUEST (42). That would allow us to support
>> changing all three at some time in the future, without having to change the
>> API.
>>
>>
>>>   *   I know that it would be a breaking change in the Admin Client API
>>> but why having an AlteredTopic class which is quite similar to the already
>>> existing NewTopic class ? I know that using NewTopic for the alter method
>>> could be misleading. What about renaming NewTopic in something like
>>> AdminTopic ? At same time it means that the TopicDetails class (which you
>>> can get from the current NewTopic) should be outside the
>>> CreateTopicsRequest because it could be used in the AlterTopicsRequest as
>>> well.
>>>
>>
>> One problem with this is it tends to inhibit future API changes for
>> either newTopics() or alterTopics(), because any common class needs to make
>> sense in both contexts.
>>
>> For createTopics() we get to specify some configs (the
>> Map<String,String>), but since the AdminClient already has alterConfigs()
>> for changing topic configs after topic creation I don't think it's right to
>> also support changing those configs via alterTopics() as well. But having
>> them in a common AdminTopic class would suggest that that was supported.
>> Yes, alterTopics could return INVALID_REQUEST if it was given topic
>> configs, but this is just making the API harder to use since it is
>> weakening of the type safety of the API.
>>
>> I suppose we *could* write a common TopicDetails class and make the
>> existing nested one extend the common one, with deprecations, to eventually
>> remove the nested one.
>>
>>
>>
>>>   *   A typo in the ReplicaStatus : gpartition() instead of partition()
>>>   *   In the AlterTopicRequets
>>>      *   the replication factor should be INT16
>>>
>>
>> Ah, thanks!
>>
>>
>>>      *   I would use same fields name as CreateTopicsRequest (they are
>>> quite similar)
>>>   *   What's the broker id in the ReplicaStatusRequest ?
>>>
>>
>> It's the broker, which is expected to have a replica of the given
>> partition, that we're querying the status of. It is necessary because we're
>> asking the _leader_ for the partition about (a subset of) the status of the
>> followers. Put another way, to identify the replica of a particular
>> partition on a particular broker we need the tuple (topic, partition,
>> broker).
>>
>> If we were always interested in the status of the partition across all
>> brokers we could omit the broker part. But for reassignment we actually
>> only care about a subset of the brokers.
>>
>>
>>>   *   Thinking aloud, could make sense having "Partition" in the
>>> ReplicaStatusRequest as an array so that I can specify in only one request
>>> the status for partitions I'm interested in, in order to avoid e request
>>> for each partition for the same topic. Maybe empty array could mean ..
>>> "give me status for all partitions of this topic". Of course it means that
>>> the ReplicaStatusResponse should change because we should have an array
>>> with partition, broker, lag and so on
>>>
>>
>> You already can specify in one request the status for all the partitions
>> you're interested in (ReplicaStatus can be repeated/is an array field).
>>
>> We could factor out the topic to avoid repeating it, which would be more
>> efficient when we're querying the status of many partitions of a topic
>> and/or there are many brokers holding replicas. In other words, we could
>> factor it to look like this:
>>
>> ReplicaStatusRequest => [TopicReplicas]
>>   TopicReplicas => Topic [PartitionReplica]
>>     Topic => string
>>     PartitionReplica => Partition Broker
>>       Partition => int32
>>       Broker => int32
>>
>> Does this make sense to you?
>>
>>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Ted Yu <yu...@gmail.com>.
bq. What about startPartitionAssignment() ?

Make sense.
startPartitionReassignment() seems to be better since the API deals with
reassignment.

Cheers

On Tue, Sep 5, 2017 at 9:39 AM, Tom Bentley <t....@gmail.com> wrote:

> I've revised this KIP again:
>
> * Change the alterPartitionCounts() API to support passing an optional
> assignment for the new partitions (which is already supported by
> kafka-topics.sh). At the same time I didn't want the API to suggest it was
> possible to change the existing assignments in the same call (which isn't
> supported today).
>
> * Removed alterReplicationFactor(), since it's really just a special case
> of reassignPartitions(): In both cases it is necessary to give a complete
> partition assignment and both can be long running due to data movement.
>
>
> Outstanding questions:
>
> 1. Is the proposed alterInterReplicaThrottle() API really better than
> changing the throttle via alterConfigs()? It wouldn't be necessary to
> support alterConfigs() for all broker configs, just DynamicConfigs (which
> can't be set via the config file any way). Would it be a problem that
> triggering the reassignment required ClusterAction on the Cluster, but
> throttling the assignment required Alter on the Topic? What if a user had
> the former permission, but not the latter?
>
> 2. Is reassignPartitions() really the best name? I find the distinction
> between reassigning and just assigning to be of limited value, and
> "reassign" is misleading when the APIs now used for changing the
> replication factor too. Since the API is asynchronous/long running, it
> might be better to indicate that in the name some how. What about
> startPartitionAssignment()?
>
> I am, of course, interested in any other questions or comments people have.
>
>
>
>
> On 30 August 2017 at 16:17, Tom Bentley <t....@gmail.com> wrote:
>
> > Hi all,
> >
> > I've updated the KIP as follows:
> >
> > * remove the APIs supporting progress reporting in favour of the APIs
> > being implemented in KIP-113.
> > * added some APIs to cover the existing functionality around throttling
> > inter-broker transfers, which was previously a TODO.
> >
> > To respond to Colin's suggestion:
> >
> > > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> > >                                   Map<String, TopicAlteration> alters)
> > >
> > > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> > >
> > > ReplicationFactorAlteration(int repl) implements TopicAlteration
> > >
> > > ReassignPartitionsAlteration(...) implements TopicAlteration
> >
> > That is a workable alternative to providing 3 separate methods on the
> > AdminClient, but I don't see why it is objectively better. I don't see
> > clients commonly needing to do a mixture of alterations, and it assumes
> > that the options make sense for all alterations.
> >
> > Cheers,
> >
> > Tom
> >
> > On 22 August 2017 at 23:49, Colin McCabe <cm...@apache.org> wrote:
> >
> >> On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
> >> > Hi Dong and Jun,
> >> >
> >> > Thanks again for your input in this discussion and on KIP-113. It's
> >> > difficult that discussion is split between this thread and the one for
> >> > KIP-113, but I'll try to respond on this thread to questions asked on
> >> > this
> >> > thread.
> >> >
> >> > It seems there is some consensus that the alterTopic() API is the
> wrong
> >> > thing, and it would make more sense to separate the different kinds of
> >> > alteration into separate APIs. It seems to me there is then a choice.
> >> >
> >> > 1. Have separate alterPartitionCount(), alterReplicationFactor() and
> >> > reassignPartitions() methods. This would most closely match the
> >> > facilities
> >> > currently offered by kafka-alter-topics and kafka-reassign-partitions.
> >> > 2. Just provide a reassignPartitions() method and infer from the shape
> >> of
> >> > the data passed to that that a change in replication factor and/or
> >> > partition count is required, as suggested by Dong.
> >> >
> >> > The choice we make here is also relevant to KIP-113 and KIP-178. By
> >> > choosing (1) we can change the replication factor or partition count
> >> > without providing an assignment and therefore are necessarily
> requiring
> >> > the
> >> > controller to make a decision for us about which broker (and, for 113,
> >> > which log directory and thus which disk) the replica should be reside.
> >> > There is then the matter of what criteria the controller should use to
> >> > make
> >> > that decision (a decision is also required on topic creation of
> course,
> >> > but
> >> > I'll try not to go there right now).
> >>
> >> Hmm.  One approach would be to have something like
> >>
> >> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> >> >                                   Map<String, TopicAlteration> alters)
> >> >
> >> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> >> >
> >> > ReplicationFactorAlteration(int repl) implements TopicAlteration
> >> >
> >> > ReassignPartitionsAlteration(...) implements TopicAlteration
> >>
> >>
> >> >
> >> > Choosing (2), on the other hand, forces us to make an assignment, and
> >> > currently the AdminClient doesn't provide the APIs necessary to make a
> >> > very
> >> > informed decision. To do the job properly we'd need APIs to enumerate
> >> the
> >> > permissible log directories on each broker, know the current disk
> usage
> >> > etc. These just don't exist today, and I think it would be a lot of
> work
> >> > to
> >> > bite off to specify them. We've already got three KIPs on the go.
> >> >
> >> > So, for the moment, I think we have to choose 1, for pragmatic
> reasons.
> >> > There is nothing to stop us deprecating alterPartitionCount() and
> >> > alterReplicationFactor() and moving to (2) at a later date.
> >>
> >> Yeah, the API should be pragmatic.  An API that doesn't let us do what
> >> we want to do (like let Kafka assign us a new replica on the least
> >> loaded node) is not good.
> >>
> >> >
> >> > To answer a specific questions of Jun's:
> >> >
> >> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> >> > > means.
> >> >
> >> >
> >> > It corresponds to the ReplicaStatus.statusTime, and the idea was it
> was
> >> > the
> >> > epoch offset at which the controllers notion of the lag was current
> >> > (which
> >> > I think is the epoch offset of the last FetchRequest from the
> follower).
> >> > At
> >> > one point I was considering some other ways of determining progress
> and
> >> > completion, and TBH I think it was more pertinent to those rejected
> >> > alternatives.
> >> >
> >> > I've already made some changes to KIP-179. As suggested in the thread
> >> for
> >> > KIP-113, I will be changing some more since I think we can
> >> > DescribeDirsRequest
> >> > instead of ReplicaStatusResponse to implement the --progress option.
> >>
> >> That makes sense.  Whatever response is used, it would be nice to have
> >> an error message in addition to an error code.
> >>
> >> best,
> >> Colin
> >>
> >> >
> >> > Cheers,
> >> >
> >> > Tom
> >> >
> >> >
> >> >
> >> > On 9 August 2017 at 02:28, Jun Rao <ju...@confluent.io> wrote:
> >> >
> >> > > Hi, Tom,
> >> > >
> >> > > Thanks for the KIP. A few minor comments below.
> >> > >
> >> > > 1. Implementation wise, the broker handles adding partitions
> >> differently
> >> > > from changing replica assignment. For the former, we directly update
> >> the
> >> > > topic path in ZK with the new partitions. For the latter, we write
> >> the new
> >> > > partition reassignment in the partition reassignment path. Changing
> >> the
> >> > > replication factor is handled in the same way as changing replica
> >> > > assignment. So, it would be useful to document how the broker
> handles
> >> these
> >> > > different cases accordingly. I think it's simpler to just allow one
> >> change
> >> > > (partition, replication factor, or replica assignment) in a request.
> >> > >
> >> > > 2. Currently, we only allow adding partitions. We probably want to
> >> document
> >> > > the restriction in the api.
> >> > >
> >> > > 3. It's not very clear to me what status_time in
> ReplicaStatusResponse
> >> > > means.
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > >
> >> > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <li...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hey Tom,
> >> > > >
> >> > > > Thanks for your reply. Here are my thoughts:
> >> > > >
> >> > > > 1) I think the DescribeDirsResponse can be used by AdminClient to
> >> query
> >> > > the
> >> > > > lag of follower replica as well. Here is how it works:
> >> > > >
> >> > > > - AdminClient sends DescribeDirsRequest to both the leader and the
> >> > > follower
> >> > > > of the partition.
> >> > > > - DescribeDirsResponse from both leader and follower shows the LEO
> >> of the
> >> > > > leader replica and the follower replica.
> >> > > > - Lag of the follower replica is the difference in the LEO between
> >> leader
> >> > > > and follower.
> >> > > >
> >> > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs
> to
> >> be
> >> > > send
> >> > > > to each replica of the partition whereas ReplicaStatusRequest only
> >> needs
> >> > > to
> >> > > > be sent to the leader of the partition. It doesn't seem to make
> much
> >> > > > difference though. In practice we probably want to query the
> >> replica lag
> >> > > of
> >> > > > many partitions and AminClient needs to send exactly one request
> to
> >> each
> >> > > > broker with either solution. Does this make sense?
> >> > > >
> >> > > >
> >> > > > 2) KIP-179 proposes to add the following AdminClient API:
> >> > > >
> >> > > > alterTopics(Collection<AlteredTopic> alteredTopics,
> >> AlterTopicsOptions
> >> > > > options)
> >> > > >
> >> > > > Where AlteredTopic includes the following fields
> >> > > >
> >> > > > AlteredTopic(String name, int numPartitions, int
> replicationFactor,
> >> > > > Map<Integer,List<Integer>> replicasAssignment)
> >> > > >
> >> > > > I have two comments on this:
> >> > > >
> >> > > > - The information in AlteredTopic seems a bit redundant. Both
> >> > > numPartitions
> >> > > > and replicationFactor can be derived from replicasAssignment. I
> >> think we
> >> > > > can probably just use the following API in AdminClient instead:
> >> > > >
> >> > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> >> > > > partitionAssignment, AlterTopicsOptions options)
> >> > > >
> >> > > > - Do you think "reassignPartitions" may be a better name than
> >> > > > "alterTopics"? This is more consistent with the existing name used
> >> in
> >> > > > kafka-reassign-partitions.sh and I also find it to be more
> >> accurate. On
> >> > > the
> >> > > > other hand, alterTopics seems to suggest that we can also alter
> >> topic
> >> > > > config.
> >> > > >
> >> > > >
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <
> t.j.bentley@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi Dong,
> >> > > > >
> >> > > > > Thanks for your reply.
> >> > > > >
> >> > > > > You're right that your DescribeDirsResponse contains appropriate
> >> data.
> >> > > > The
> >> > > > > comment about the log_end_offset in the KIP says "Enable user to
> >> track
> >> > > > > movement progress by comparing LEO of the *.log and *.move".
> That
> >> makes
> >> > > > me
> >> > > > > wonder whether this would only work for replica movement on the
> >> same
> >> > > > > broker. In the case of reassigning partitions between brokers
> >> it's only
> >> > > > > really the leader for the partition that knows the max LEO of
> the
> >> ISR
> >> > > and
> >> > > > > the LEO of the target broker. Maybe the comment is misleading
> and
> >> it
> >> > > > would
> >> > > > > be the leader doing the same thing in both cases. Can you
> clarify
> >> this?
> >> > > >
> >> > > >
> >> > > > > At a conceptual level there's a lot of similarity between
> KIP-113
> >> and
> >> > > > > KIP-179 because they're both about moving replicas around.
> >> Concretely
> >> > > > > though, ChangeReplicaDirRequest assumes the movement is on the
> >> same
> >> > > > broker,
> >> > > > > while the equivalent APIs in KIP-179 (whichever alternative)
> lack
> >> the
> >> > > > > notion of disks. I wonder if a unified API would be better or
> not.
> >> > > > > Operationally, the reasons for changing replicas between brokers
> >> are
> >> > > > likely
> >> > > > > to be motivated by balancing load across the cluster, or
> >> > > adding/removing
> >> > > > > brokers. But the JBOD use case has different motivations. So I
> >> suspect
> >> > > > it's
> >> > > > > OK that the APIs are separate, but I'd love to hear what others
> >> think.
> >> > > >
> >> > > >
> >> > > > > I think you're right about TopicPartitionReplica. At the
> protocol
> >> level
> >> > > > we
> >> > > > > could group topics and partitions together so avoid having the
> >> same
> >> > > topic
> >> > > > > name multiple times when querying for the status of all the
> >> partitions
> >> > > > of a
> >> > > > > topic.
> >> > > > >
> >> > > > > Thanks again for taking the time to respond.
> >> > > > >
> >> > > > > Tom
> >> > > > >
> >> > > > > On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com>
> wrote:
> >> > > > >
> >> > > > > > Hey Tom,
> >> > > > > >
> >> > > > > > Thanks for the KIP. It seems that the prior discussion in this
> >> thread
> >> > > > has
> >> > > > > > focused on reassigning partitions (or AlterTopics). I haven't
> >> looked
> >> > > > into
> >> > > > > > this yet. I have two comments on the replicaStatus() API and
> the
> >> > > > > > ReplicaStatusRequest:
> >> > > > > >
> >> > > > > > -  It seems that the use-case for ReplicaStatusRequest is
> >> covered by
> >> > > > > > the DescribeDirsRequest introduced in KIP-113
> >> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > > > > 113%3A+Support+replicas+movement+between+log+directories>.
> >> > > > > > DescribeDirsResponse contains the logEndOffset for the
> specified
> >> > > > > > partitions. The the lag of the follower replica can be derived
> >> as the
> >> > > > > > difference between leader's LEO and follower's LEO. Do you
> >> think we
> >> > > can
> >> > > > > > simply use DescribeDirsRequest without introducing
> >> > > > ReplicaStatusRequest?
> >> > > > > >
> >> > > > > > - According to the current KIP, replicaStatus() takes a list
> of
> >> > > > > partitions
> >> > > > > > as input. And its output maps the partition to a list of
> replica
> >> > > > status.
> >> > > > > Do
> >> > > > > > you think it would be better to have a new class called
> >> > > > > > TopicPartitionReplica, which identifies the topic, partition
> >> and the
> >> > > > > > brokerId. replicaStatus can then take a list of
> >> TopicPartitionReplica
> >> > > > as
> >> > > > > > input. And its output maps the replica to replica status. The
> >> latter
> >> > > > API
> >> > > > > > seems simpler and also matches the method name better. What do
> >> you
> >> > > > think?
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Dong
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <
> >> t.j.bentley@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi again Ismael,
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > 1. It's worth emphasising that reassigning partitions is a
> >> > > different
> >> > > > > > > >> process than what happens when a topic is created, so not
> >> sure
> >> > > > > trying
> >> > > > > > to
> >> > > > > > > >> make it symmetric is beneficial. In addition to what was
> >> already
> >> > > > > > > >> discussed,
> >> > > > > > > >> one should also enable replication throttling before
> >> moving the
> >> > > > > data.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Thanks for your responses.The only advantage I can see
> from
> >> > > having
> >> > > > > > > > separate APIs for partition count change vs. the other
> >> changes is
> >> > > > > that
> >> > > > > > we
> >> > > > > > > > expect the former to return synchronously, and the latter
> to
> >> > > > > (usually)
> >> > > > > > > > return asynchronously. Lumping them into the same API
> forces
> >> > > > clients
> >> > > > > of
> >> > > > > > > > that API to code for the two possible cases. Is this the
> >> > > particular
> >> > > > > > > concern
> >> > > > > > > > you had in mind, or do you have others?
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > Just to help see what this looks like I've created another
> >> version
> >> > > of
> >> > > > > > > KIP-179 (
> >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> >> > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+
> >> AdminClient
> >> > > > > > > ).
> >> > > > > > >
> >> > > > > > > The AdminClient APIs are more type safe (harder to misuse).
> >> The
> >> > > > > > > asynchronous nature of the alterReplicationFactors() and
> >> > > > > > > reassignPartitions() methods aren't really apparent (you'd
> >> still
> >> > > have
> >> > > > > to
> >> > > > > > > read the javadocs to know you had to call replicaStatus() to
> >> > > > determine
> >> > > > > > > completion), but it's still better that the lumped-together
> >> API
> >> > > > because
> >> > > > > > the
> >> > > > > > > methods are _consistently_ async. The Protocol APIs are
> >> slightly
> >> > > > nicer
> >> > > > > > too,
> >> > > > > > > in that INVALID_REQUEST is less commonly returned. Another
> >> benefit
> >> > > is
> >> > > > > it
> >> > > > > > > would allow for these different alteration APIs to be
> modified
> >> > > > > > > independently in the future, should that necessary.
> >> > > > > > >
> >> > > > > > > What do others think?
> >> > > > > > >
> >> > > > > > > You're right that the proposal doesn't cover setting and
> >> clearing a
> >> > > > > > > > throttle, and it should. I will study the code about how
> >> this
> >> > > works
> >> > > > > > > before
> >> > > > > > > > posting back about that.
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > AFAICS from the code the throttle is simply a dynamic config
> >> of the
> >> > > > > > broker.
> >> > > > > > > We already have the AdminClient.alterConfigs API for setting
> >> this,
> >> > > so
> >> > > > > the
> >> > > > > > > refactoring of kafka-reassign-partitions.sh could just use
> >> that
> >> > > > > existing
> >> > > > > > > API. We could still achieve your objective of setting the
> >> throttle
> >> > > > > before
> >> > > > > > > reassignment by making the --throttle option mandatory when
> >> > > --execute
> >> > > > > was
> >> > > > > > > present. Or did you have something else in mind?
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > >
> >> > > > > > > Tom
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi all,

I've been thinking about the proposed changes in KIP-179 and, on reflection,
I don't think the API presented is really ideal. Some of the limitations it
has include:

1. It sticks to the current, batch oriented (i.e. a single set of
reassignments at a time), model.
2. It still doesn't really provide a nice way of knowing that a
reassignment is complete.
3. As presented, the automatic removal of throttles only happens at the end
of
the reassignment batch. But individual brokers could be unthrottled before
then.

As an illustration of this, https://issues.apache.org/jira/browse/KAFKA-6304
provides a use case for wanting to cancel a reassignment because one of the
brokers in the new assignment has failed. With the proposed API:

1. We can't identify the subset of the reassignment batch which we want to
cancel.
2. All we could do would be to revise the proposed API to allow calling
   reassignPartitions() while a reassignment was in progress. This second
call
   could revert the subset of reassignments involving the failed broker.
3. But the API has no way to express that the original reassignment was
cancelled.

Another illustration of the problem: An advanced cluster balancer
(such as LinkedIn's cruise control) has to batch large reassignments
(partly so as to make cancellation easier). This batching itself leads in
inefficiency because some of the partitions in the batch will finish before
others, so time is wasted with the cluster only moving a small number of
partitions (when most in the batch have finished).

In hindsight, I think I was too influenced by reproducing what the
kafka-reassign-partitions.sh tool does today. I think what's actually
needed
(for things like cruise control) is an API that's more fine-grained, and
less batch oriented. I am therefore withdrawing KIP-179 and
intend to start a new KIP to propose a different API for partition
reassignment.

I'm still interested in hearing about other deficiencies of the KIP-179
proposal,
so I can avoid them in the new proposal. Similarly, if there are features
you'd like to see in the API, please let me know.

I won't go in to details of the new API here, but the basic idea I'd like
to use is to
give the reassignment of each partition an identity (though this wouldn't
be exposed directly in the API). This is necessary to allow new
reassignments
to be added while some are already running. API methods would then be
provided to discover
all the currently running reassignments, determine if a reassignment is
still running etc.

Cheers,

Tom

On 1 November 2017 at 10:20, Tom Bentley <t....@gmail.com> wrote:

> This thread has been very quiet for a while now. It's unclear whether this
> is because no one has anything more to say, or whether no one has taken a
> look at it in its current form. I suspect the latter, so I'm not calling
> the vote today, but instead asking for more review.
>
> What's currently proposed – in addition to the reassignPartitions() API
> itself – is to have a pair of RPCs for managing throttles. This is quite
> different from the earlier proposal to reuse alterConfigs(). The benefits
> of this specific API include:
>
> * being more typesafe,
> * allowing for the automatic removal of throttles when reassignment has
> completed,
> * being careful about correct management of the throttles wrt controller
> failover
>
> Surely someone has something to say about this, before we reach the vote
> stage?
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+
> Change+ReassignPartitionsCommand+to+use+AdminClient
>
> Thanks,
>
> Tom
>
>
> On 25 October 2017 at 10:33, Tom Bentley <t....@gmail.com> wrote:
>
>> If there are no further comments, I will start a vote on this next week.
>>
>> Thanks,
>>
>> Tom
>>
>> On 20 October 2017 at 08:33, Tom Bentley <t....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I've made a fairly major update to KIP-179 to propose APIs for setting
>>> throttled rates and throttled replicas with the ability to remove these
>>> automatically at the end of reassignment.
>>>
>>> I'd be grateful for your feedback:
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+
>>> Change+ReassignPartitionsCommand+to+use+AdminClient
>>>
>>> Thanks,
>>>
>>> Tom
>>>
>>> On 2 October 2017 at 13:15, Tom Bentley <t....@gmail.com> wrote:
>>>
>>>> One question I have is about whether/how to scope throttling to a
>>>> reassignment. Currently throttles are only loosely associated with
>>>> reassignment: You can start a reassignment without any throttling, add
>>>> throttling to an in-flight reassignment, and remember/forget to remove
>>>> throttling after the reassignment is complete. There's is great flexibility
>>>> in that, but also the risk that you forget the remove the throttle(s).
>>>>
>>>> Just adding an API for setting the throttled rate makes this situation
>>>> worse: While it's nice to be able to auto-remove the throttles rate what
>>>> about the config for the throttled replicas? Also you might add a throttle
>>>> thinking a reassignment is in-flight, but it has in fact just finished:
>>>> Those throttles will now hang around until reset or the end of the next
>>>> reassignment. For these reasons it would be good if the throttle were more
>>>> directly scoped to the reassignment.
>>>>
>>>> On the other hand, taking LinkedIn's Cruise Control as an example,
>>>> there they seem to modify the reassignment znode directly and incrementally
>>>> and so there is no notion of "the reassignment". Reassignments will be
>>>> running continuously, with partitions added before all of the current
>>>> partitions have completed. If there is no meaningful cluster-wide
>>>> "reassignment" then it would be better to remove remove the throttle by
>>>> changing the list of replicas as each replica catches up.
>>>>
>>>> I'm interested in any use cases people can share on this, as I'd like
>>>> the throttle API to be useful for a broad range of use cases, rather than
>>>> being too narrowly focussed on what's needed by the existing CLI tools.
>>>>
>>>> Thanks,
>>>>
>>>> Tom
>>>>
>>>>
>>>>
>>>>
>>>> On 28 September 2017 at 17:22, Tom Bentley <t....@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm starting to think about KIP-179 again. In order to have more
>>>>> manageably-scoped KIPs and PRs I think it might be worth factoring-out the
>>>>> throttling part into a separate KIP. Wdyt?
>>>>>
>>>>> Keeping the throttling discussion in this thread for the moment...
>>>>>
>>>>> The throttling behaviour is currently spread across the
>>>>> `(leader|follower).replication.throttled.replicas` topic config and
>>>>> the `(leader|follower).replication.throttled.rate` dynamic broker
>>>>> config. It's not really clear to me exactly what "removing the throttle" is
>>>>> supposed to mean. I mean we could reset the rate to Long.MAV_VALUE or we
>>>>> could change the list of replicas to an empty list. The
>>>>> ReassignPartitionsCommand does both, but there is some small utility in
>>>>> leaving the rate, but clearing the list, if you've discovered the "right"
>>>>> rate for your cluster/workload and to want it to be sticky for next time.
>>>>> Does any one do this in practice?
>>>>>
>>>>> With regards to throttling, it would be
>>>>>>> worth thinking about a way where the throttling configs can be
>>>>>>> automatically removed without the user having to re-run the tool.
>>>>>>>
>>>>>>
>>>>>> Isn't that just a matter of updating the topic configs for
>>>>>> (leader|follower).replication.throttled.replicas at the same time we
>>>>>> remove the reassignment znode? That leaves open the question about whether
>>>>>> to reset the rates at the same time.
>>>>>>
>>>>>
>>>>> Thinking some more about my "update the configs at the same time we
>>>>> remove the reassignment znode" suggestion. The reassignment znode is
>>>>> persistent, so the reassignment will survive a zookeeper restart. If there
>>>>> was a flag for the auto-removal of the throttle it would likewise need to
>>>>> be persistent. Otherwise a ZK restart would remember the reassignment, but
>>>>> forget about the preference for auto removal of throttles. So, we would use
>>>>> a persistent znode (a child of the reassignment path, perhaps) to store a
>>>>> flag for throttle removal.
>>>>>
>>>>> Thoughts?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Tom
>>>>>
>>>>
>>>>
>>>
>>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
This thread has been very quiet for a while now. It's unclear whether this
is because no one has anything more to say, or whether no one has taken a
look at it in its current form. I suspect the latter, so I'm not calling
the vote today, but instead asking for more review.

What's currently proposed – in addition to the reassignPartitions() API
itself – is to have a pair of RPCs for managing throttles. This is quite
different from the earlier proposal to reuse alterConfigs(). The benefits
of this specific API include:

* being more typesafe,
* allowing for the automatic removal of throttles when reassignment has
completed,
* being careful about correct management of the throttles wrt controller
failover

Surely someone has something to say about this, before we reach the vote
stage?

https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+Change+
ReassignPartitionsCommand+to+use+AdminClient

Thanks,

Tom


On 25 October 2017 at 10:33, Tom Bentley <t....@gmail.com> wrote:

> If there are no further comments, I will start a vote on this next week.
>
> Thanks,
>
> Tom
>
> On 20 October 2017 at 08:33, Tom Bentley <t....@gmail.com> wrote:
>
>> Hi,
>>
>> I've made a fairly major update to KIP-179 to propose APIs for setting
>> throttled rates and throttled replicas with the ability to remove these
>> automatically at the end of reassignment.
>>
>> I'd be grateful for your feedback:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+
>> Change+ReassignPartitionsCommand+to+use+AdminClient
>>
>> Thanks,
>>
>> Tom
>>
>> On 2 October 2017 at 13:15, Tom Bentley <t....@gmail.com> wrote:
>>
>>> One question I have is about whether/how to scope throttling to a
>>> reassignment. Currently throttles are only loosely associated with
>>> reassignment: You can start a reassignment without any throttling, add
>>> throttling to an in-flight reassignment, and remember/forget to remove
>>> throttling after the reassignment is complete. There's is great flexibility
>>> in that, but also the risk that you forget the remove the throttle(s).
>>>
>>> Just adding an API for setting the throttled rate makes this situation
>>> worse: While it's nice to be able to auto-remove the throttles rate what
>>> about the config for the throttled replicas? Also you might add a throttle
>>> thinking a reassignment is in-flight, but it has in fact just finished:
>>> Those throttles will now hang around until reset or the end of the next
>>> reassignment. For these reasons it would be good if the throttle were more
>>> directly scoped to the reassignment.
>>>
>>> On the other hand, taking LinkedIn's Cruise Control as an example, there
>>> they seem to modify the reassignment znode directly and incrementally and
>>> so there is no notion of "the reassignment". Reassignments will be running
>>> continuously, with partitions added before all of the current partitions
>>> have completed. If there is no meaningful cluster-wide "reassignment" then
>>> it would be better to remove remove the throttle by changing the list of
>>> replicas as each replica catches up.
>>>
>>> I'm interested in any use cases people can share on this, as I'd like
>>> the throttle API to be useful for a broad range of use cases, rather than
>>> being too narrowly focussed on what's needed by the existing CLI tools.
>>>
>>> Thanks,
>>>
>>> Tom
>>>
>>>
>>>
>>>
>>> On 28 September 2017 at 17:22, Tom Bentley <t....@gmail.com>
>>> wrote:
>>>
>>>> I'm starting to think about KIP-179 again. In order to have more
>>>> manageably-scoped KIPs and PRs I think it might be worth factoring-out the
>>>> throttling part into a separate KIP. Wdyt?
>>>>
>>>> Keeping the throttling discussion in this thread for the moment...
>>>>
>>>> The throttling behaviour is currently spread across the
>>>> `(leader|follower).replication.throttled.replicas` topic config and
>>>> the `(leader|follower).replication.throttled.rate` dynamic broker
>>>> config. It's not really clear to me exactly what "removing the throttle" is
>>>> supposed to mean. I mean we could reset the rate to Long.MAV_VALUE or we
>>>> could change the list of replicas to an empty list. The
>>>> ReassignPartitionsCommand does both, but there is some small utility in
>>>> leaving the rate, but clearing the list, if you've discovered the "right"
>>>> rate for your cluster/workload and to want it to be sticky for next time.
>>>> Does any one do this in practice?
>>>>
>>>> With regards to throttling, it would be
>>>>>> worth thinking about a way where the throttling configs can be
>>>>>> automatically removed without the user having to re-run the tool.
>>>>>>
>>>>>
>>>>> Isn't that just a matter of updating the topic configs for
>>>>> (leader|follower).replication.throttled.replicas at the same time we
>>>>> remove the reassignment znode? That leaves open the question about whether
>>>>> to reset the rates at the same time.
>>>>>
>>>>
>>>> Thinking some more about my "update the configs at the same time we
>>>> remove the reassignment znode" suggestion. The reassignment znode is
>>>> persistent, so the reassignment will survive a zookeeper restart. If there
>>>> was a flag for the auto-removal of the throttle it would likewise need to
>>>> be persistent. Otherwise a ZK restart would remember the reassignment, but
>>>> forget about the preference for auto removal of throttles. So, we would use
>>>> a persistent znode (a child of the reassignment path, perhaps) to store a
>>>> flag for throttle removal.
>>>>
>>>> Thoughts?
>>>>
>>>> Cheers,
>>>>
>>>> Tom
>>>>
>>>
>>>
>>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
If there are no further comments, I will start a vote on this next week.

Thanks,

Tom

On 20 October 2017 at 08:33, Tom Bentley <t....@gmail.com> wrote:

> Hi,
>
> I've made a fairly major update to KIP-179 to propose APIs for setting
> throttled rates and throttled replicas with the ability to remove these
> automatically at the end of reassignment.
>
> I'd be grateful for your feedback:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+Change+
> ReassignPartitionsCommand+to+use+AdminClient
>
> Thanks,
>
> Tom
>
> On 2 October 2017 at 13:15, Tom Bentley <t....@gmail.com> wrote:
>
>> One question I have is about whether/how to scope throttling to a
>> reassignment. Currently throttles are only loosely associated with
>> reassignment: You can start a reassignment without any throttling, add
>> throttling to an in-flight reassignment, and remember/forget to remove
>> throttling after the reassignment is complete. There's is great flexibility
>> in that, but also the risk that you forget the remove the throttle(s).
>>
>> Just adding an API for setting the throttled rate makes this situation
>> worse: While it's nice to be able to auto-remove the throttles rate what
>> about the config for the throttled replicas? Also you might add a throttle
>> thinking a reassignment is in-flight, but it has in fact just finished:
>> Those throttles will now hang around until reset or the end of the next
>> reassignment. For these reasons it would be good if the throttle were more
>> directly scoped to the reassignment.
>>
>> On the other hand, taking LinkedIn's Cruise Control as an example, there
>> they seem to modify the reassignment znode directly and incrementally and
>> so there is no notion of "the reassignment". Reassignments will be running
>> continuously, with partitions added before all of the current partitions
>> have completed. If there is no meaningful cluster-wide "reassignment" then
>> it would be better to remove remove the throttle by changing the list of
>> replicas as each replica catches up.
>>
>> I'm interested in any use cases people can share on this, as I'd like the
>> throttle API to be useful for a broad range of use cases, rather than being
>> too narrowly focussed on what's needed by the existing CLI tools.
>>
>> Thanks,
>>
>> Tom
>>
>>
>>
>>
>> On 28 September 2017 at 17:22, Tom Bentley <t....@gmail.com> wrote:
>>
>>> I'm starting to think about KIP-179 again. In order to have more
>>> manageably-scoped KIPs and PRs I think it might be worth factoring-out the
>>> throttling part into a separate KIP. Wdyt?
>>>
>>> Keeping the throttling discussion in this thread for the moment...
>>>
>>> The throttling behaviour is currently spread across the
>>> `(leader|follower).replication.throttled.replicas` topic config and the
>>> `(leader|follower).replication.throttled.rate` dynamic broker config.
>>> It's not really clear to me exactly what "removing the throttle" is
>>> supposed to mean. I mean we could reset the rate to Long.MAV_VALUE or we
>>> could change the list of replicas to an empty list. The
>>> ReassignPartitionsCommand does both, but there is some small utility in
>>> leaving the rate, but clearing the list, if you've discovered the "right"
>>> rate for your cluster/workload and to want it to be sticky for next time.
>>> Does any one do this in practice?
>>>
>>> With regards to throttling, it would be
>>>>> worth thinking about a way where the throttling configs can be
>>>>> automatically removed without the user having to re-run the tool.
>>>>>
>>>>
>>>> Isn't that just a matter of updating the topic configs for
>>>> (leader|follower).replication.throttled.replicas at the same time we
>>>> remove the reassignment znode? That leaves open the question about whether
>>>> to reset the rates at the same time.
>>>>
>>>
>>> Thinking some more about my "update the configs at the same time we
>>> remove the reassignment znode" suggestion. The reassignment znode is
>>> persistent, so the reassignment will survive a zookeeper restart. If there
>>> was a flag for the auto-removal of the throttle it would likewise need to
>>> be persistent. Otherwise a ZK restart would remember the reassignment, but
>>> forget about the preference for auto removal of throttles. So, we would use
>>> a persistent znode (a child of the reassignment path, perhaps) to store a
>>> flag for throttle removal.
>>>
>>> Thoughts?
>>>
>>> Cheers,
>>>
>>> Tom
>>>
>>
>>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi,

I've made a fairly major update to KIP-179 to propose APIs for setting
throttled rates and throttled replicas with the ability to remove these
automatically at the end of reassignment.

I'd be grateful for your feedback:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient

Thanks,

Tom

On 2 October 2017 at 13:15, Tom Bentley <t....@gmail.com> wrote:

> One question I have is about whether/how to scope throttling to a
> reassignment. Currently throttles are only loosely associated with
> reassignment: You can start a reassignment without any throttling, add
> throttling to an in-flight reassignment, and remember/forget to remove
> throttling after the reassignment is complete. There's is great flexibility
> in that, but also the risk that you forget the remove the throttle(s).
>
> Just adding an API for setting the throttled rate makes this situation
> worse: While it's nice to be able to auto-remove the throttles rate what
> about the config for the throttled replicas? Also you might add a throttle
> thinking a reassignment is in-flight, but it has in fact just finished:
> Those throttles will now hang around until reset or the end of the next
> reassignment. For these reasons it would be good if the throttle were more
> directly scoped to the reassignment.
>
> On the other hand, taking LinkedIn's Cruise Control as an example, there
> they seem to modify the reassignment znode directly and incrementally and
> so there is no notion of "the reassignment". Reassignments will be running
> continuously, with partitions added before all of the current partitions
> have completed. If there is no meaningful cluster-wide "reassignment" then
> it would be better to remove remove the throttle by changing the list of
> replicas as each replica catches up.
>
> I'm interested in any use cases people can share on this, as I'd like the
> throttle API to be useful for a broad range of use cases, rather than being
> too narrowly focussed on what's needed by the existing CLI tools.
>
> Thanks,
>
> Tom
>
>
>
>
> On 28 September 2017 at 17:22, Tom Bentley <t....@gmail.com> wrote:
>
>> I'm starting to think about KIP-179 again. In order to have more
>> manageably-scoped KIPs and PRs I think it might be worth factoring-out the
>> throttling part into a separate KIP. Wdyt?
>>
>> Keeping the throttling discussion in this thread for the moment...
>>
>> The throttling behaviour is currently spread across the
>> `(leader|follower).replication.throttled.replicas` topic config and the
>> `(leader|follower).replication.throttled.rate` dynamic broker config.
>> It's not really clear to me exactly what "removing the throttle" is
>> supposed to mean. I mean we could reset the rate to Long.MAV_VALUE or we
>> could change the list of replicas to an empty list. The
>> ReassignPartitionsCommand does both, but there is some small utility in
>> leaving the rate, but clearing the list, if you've discovered the "right"
>> rate for your cluster/workload and to want it to be sticky for next time.
>> Does any one do this in practice?
>>
>> With regards to throttling, it would be
>>>> worth thinking about a way where the throttling configs can be
>>>> automatically removed without the user having to re-run the tool.
>>>>
>>>
>>> Isn't that just a matter of updating the topic configs for
>>> (leader|follower).replication.throttled.replicas at the same time we
>>> remove the reassignment znode? That leaves open the question about whether
>>> to reset the rates at the same time.
>>>
>>
>> Thinking some more about my "update the configs at the same time we
>> remove the reassignment znode" suggestion. The reassignment znode is
>> persistent, so the reassignment will survive a zookeeper restart. If there
>> was a flag for the auto-removal of the throttle it would likewise need to
>> be persistent. Otherwise a ZK restart would remember the reassignment, but
>> forget about the preference for auto removal of throttles. So, we would use
>> a persistent znode (a child of the reassignment path, perhaps) to store a
>> flag for throttle removal.
>>
>> Thoughts?
>>
>> Cheers,
>>
>> Tom
>>
>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
One question I have is about whether/how to scope throttling to a
reassignment. Currently throttles are only loosely associated with
reassignment: You can start a reassignment without any throttling, add
throttling to an in-flight reassignment, and remember/forget to remove
throttling after the reassignment is complete. There's is great flexibility
in that, but also the risk that you forget the remove the throttle(s).

Just adding an API for setting the throttled rate makes this situation
worse: While it's nice to be able to auto-remove the throttles rate what
about the config for the throttled replicas? Also you might add a throttle
thinking a reassignment is in-flight, but it has in fact just finished:
Those throttles will now hang around until reset or the end of the next
reassignment. For these reasons it would be good if the throttle were more
directly scoped to the reassignment.

On the other hand, taking LinkedIn's Cruise Control as an example, there
they seem to modify the reassignment znode directly and incrementally and
so there is no notion of "the reassignment". Reassignments will be running
continuously, with partitions added before all of the current partitions
have completed. If there is no meaningful cluster-wide "reassignment" then
it would be better to remove remove the throttle by changing the list of
replicas as each replica catches up.

I'm interested in any use cases people can share on this, as I'd like the
throttle API to be useful for a broad range of use cases, rather than being
too narrowly focussed on what's needed by the existing CLI tools.

Thanks,

Tom




On 28 September 2017 at 17:22, Tom Bentley <t....@gmail.com> wrote:

> I'm starting to think about KIP-179 again. In order to have more
> manageably-scoped KIPs and PRs I think it might be worth factoring-out the
> throttling part into a separate KIP. Wdyt?
>
> Keeping the throttling discussion in this thread for the moment...
>
> The throttling behaviour is currently spread across the
> `(leader|follower).replication.throttled.replicas` topic config and the
> `(leader|follower).replication.throttled.rate` dynamic broker config.
> It's not really clear to me exactly what "removing the throttle" is
> supposed to mean. I mean we could reset the rate to Long.MAV_VALUE or we
> could change the list of replicas to an empty list. The
> ReassignPartitionsCommand does both, but there is some small utility in
> leaving the rate, but clearing the list, if you've discovered the "right"
> rate for your cluster/workload and to want it to be sticky for next time.
> Does any one do this in practice?
>
> With regards to throttling, it would be
>>> worth thinking about a way where the throttling configs can be
>>> automatically removed without the user having to re-run the tool.
>>>
>>
>> Isn't that just a matter of updating the topic configs for
>> (leader|follower).replication.throttled.replicas at the same time we
>> remove the reassignment znode? That leaves open the question about whether
>> to reset the rates at the same time.
>>
>
> Thinking some more about my "update the configs at the same time we remove
> the reassignment znode" suggestion. The reassignment znode is persistent,
> so the reassignment will survive a zookeeper restart. If there was a flag
> for the auto-removal of the throttle it would likewise need to be
> persistent. Otherwise a ZK restart would remember the reassignment, but
> forget about the preference for auto removal of throttles. So, we would use
> a persistent znode (a child of the reassignment path, perhaps) to store a
> flag for throttle removal.
>
> Thoughts?
>
> Cheers,
>
> Tom
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
I'm starting to think about KIP-179 again. In order to have more
manageably-scoped KIPs and PRs I think it might be worth factoring-out the
throttling part into a separate KIP. Wdyt?

Keeping the throttling discussion in this thread for the moment...

The throttling behaviour is currently spread across the `(leader|follower).
replication.throttled.replicas` topic config and the
`(leader|follower).replication.throttled.rate`
dynamic broker config. It's not really clear to me exactly what "removing
the throttle" is supposed to mean. I mean we could reset the rate to
Long.MAV_VALUE or we could change the list of replicas to an empty list.
The ReassignPartitionsCommand does both, but there is some small utility in
leaving the rate, but clearing the list, if you've discovered the "right"
rate for your cluster/workload and to want it to be sticky for next time.
Does any one do this in practice?

With regards to throttling, it would be
>> worth thinking about a way where the throttling configs can be
>> automatically removed without the user having to re-run the tool.
>>
>
> Isn't that just a matter of updating the topic configs for
> (leader|follower).replication.throttled.replicas at the same time we
> remove the reassignment znode? That leaves open the question about whether
> to reset the rates at the same time.
>

Thinking some more about my "update the configs at the same time we remove
the reassignment znode" suggestion. The reassignment znode is persistent,
so the reassignment will survive a zookeeper restart. If there was a flag
for the auto-removal of the throttle it would likewise need to be
persistent. Otherwise a ZK restart would remember the reassignment, but
forget about the preference for auto removal of throttles. So, we would use
a persistent znode (a child of the reassignment path, perhaps) to store a
flag for throttle removal.

Thoughts?

Cheers,

Tom

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi Ismael,

OK, KIP-195 has been factored out.

Regarding the dynamic configs, I personally still think we should have a
> specific protocol API for that


Can you explain a little more why?

With regards to throttling, it would be
> worth thinking about a way where the throttling configs can be
> automatically removed without the user having to re-run the tool.
>

Isn't that just a matter of updating the topic configs for
(leader|follower).replication.throttled.replicas
at the same time we remove the reassignment znode? That leaves open the
question about whether to reset the rates at the same time.

But now I wonder what the broker configs
"(leader|follower).replication.throttled.rate"
are DynamicConfigs but the topic configs
"(leader|follower).replication.throttled.replicas"
are normal configs. Aren't the topic ones for the replicas just as dynamic
as the broker ones for the rate?

Thanks,

Tom


On 7 September 2017 at 17:24, Ismael Juma <is...@juma.me.uk> wrote:

> Hi Tom,
>
> It won't be used within Kafka, but it's a public API that can be used by
> other projects. And the protocol can be used by non-Java clients. So, there
> is still value in including it.
>
> Regarding the dynamic configs, I personally still think we should have a
> specific protocol API for that. With regards to throttling, it would be
> worth thinking about a way where the throttling configs can be
> automatically removed without the user having to re-run the tool. So, yes,
> maybe it should be a separate KIP as well.
>
> Not sure if we need it in the template, but you're welcome to mention any
> dependencies when there are some.
>
> Thanks,
> Ismael
>
> On Thu, Sep 7, 2017 at 5:15 PM, Tom Bentley <t....@gmail.com> wrote:
>
> > Hi Ismael,
> >
> > It would be good to get at least some of this into 1.0.0.
> >
> > We could put the increasePartitions() work into another KIP, but it would
> > be an unused AdminClient API in that release. The consumer of this API
> will
> > be the TopicsCommand when that get refactored to use the AdminClient.
> > That's something Paolo Patierno is proposing to do but afaik not in time
> > for 1.0.0. I don't think that's an issue, though, so I'll split out a
> > separate KIP.
> >
> > FWIW, we could also split out the proposal to support describeConfigs()
> and
> > alterConfigs() for dynamic configs into a separate KIP too. But that's
> also
> > a decision we can defer until we're looking at the remainder of KIP-179.
> >
> > Aside: I wonder if it would be a good idea for the KIP template to have a
> > "Depends on" field so people can more easily keep track of how multiple
> > in-flight KIPs depend on one another?
> >
> > Cheers,
> >
> > Tom
> >
> > On 7 September 2017 at 16:42, Ismael Juma <is...@juma.me.uk> wrote:
> >
> > > Hi Tom,
> > >
> > > What do you think of moving `increasePartitionsCount` (or
> > > `increaseNumPartitions`) to a separate KIP? That is simple enough that
> we
> > > could potentially include it in 1.0.0. I'd be happy to review it.
> > > ReassignPartitions is more complex and we can probably aim to include
> > that
> > > in the January release. What do you think?
> > >
> > > Ismael
> > >
> > > On Wed, Sep 6, 2017 at 11:42 PM, Colin McCabe <cm...@apache.org>
> > wrote:
> > >
> > > > On Wed, Sep 6, 2017, at 00:20, Tom Bentley wrote:
> > > > > Hi Ted and Colin,
> > > > >
> > > > > Thanks for the comments.
> > > > >
> > > > > It seems you're both happier with reassign rather than assign, so
> I'm
> > > > > happy
> > > > > to stick with that.
> > > > >
> > > > >
> > > > > On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org>
> > wrote:
> > > > >
> > > > > > ...
> > > > >
> > > > >
> > > > > > Do we expect that reducing the number of partitions will ever be
> > > > > > supported by this API?  It seems like decreasing would require a
> > > > > > different API-- one which supported data movement, had a "check
> > > status
> > > > > > of this operation" feature, etc. etc.  If this API is only ever
> > going
> > > > to
> > > > > > be used to increase the number of partitions, I think we should
> > just
> > > > > > call it "increasePartitionCount" to avoid confusion.
> > > > > >
> > > > >
> > > > > I thought a little about the decrease possibility (hence the static
> > > > > factory
> > > > > methods on PartitionCount), but not really in enough detail. I
> > suppose
> > > a
> > > > > decrease process could look like this:
> > > > >
> > > > > 1. Producers start partitioning based on the decreased partition
> > count.
> > > > > 2. Consumers continue to consume from all partitions.
> > > > > 3. At some point all the records in the old partitions have expired
> > and
> > > > > they can be deleted.
> > > > >
> > > > > This wouldn't work for compacted topics. Of course a more
> aggressive
> > > > > strategy is also possible where we forcibly move data from the
> > > partitions
> > > > > to be deleted.
> > > > >
> > > > > Anyway, in either case the process would be long running, whereas
> the
> > > > > increase case is fast, so the semantics are quite different. So I
> > > agree,
> > > > > lets rename the method to make clear that it's only for increasing
> > the
> > > > > partition count.
> > > > >
> > > > > >
> > > > > > > Outstanding questions:
> > > > > > >
> > > > > > > 1. Is the proposed alterInterReplicaThrottle() API really
> better
> > > than
> > > > > > > changing the throttle via alterConfigs()?
> > > > > >
> > > > > > That's a good point.  I would argue that we should just use
> > > > alterConfigs
> > > > > > to set the broker configuration, rather than having a special RPC
> > > just
> > > > > > for this.
> > > > > >
> > > > >
> > > > > Yes, I'm minded to agree.
> > > > >
> > > > > The reason I originally thought a special API might be better was
> if
> > > > > people
> > > > > felt that the DynamicConfig mechanism (which seems to exist only to
> > > > > support
> > > > > these throttles) was an implementation detail of the throttles.
> But I
> > > now
> > > > > realise that they're visible via kafka-topics.sh, so they're
> > > effectively
> > > > > already a public API.
> > > > >
> > > > >
> > > > > >
> > > > > > ...
> > > > > > > Would it be a problem that
> > > > > > > triggering the reassignment required ClusterAction on the
> > Cluster,
> > > > but
> > > > > > > throttling the assignment required Alter on the Topic? What if
> a
> > > > user had
> > > > > > > the former permission, but not the latter?
> > > > > >
> > > > > > We've been trying to reserve ClusterAction on Cluster for
> > > > > > broker-initiated operations.  Alter on Cluster is the ACL for
> "root
> > > > > > stuff" and I would argue that it should be what we use here.
> > > > > >
> > > > > > For reconfiguring the broker, I think we should follow KIP-133
> and
> > > use
> > > > > > AlterConfigs on the Broker resource.  (Of course, if you just use
> > the
> > > > > > existing alterConfigs call, you get this without any special
> > effort.)
> > > > > >
> > > > >
> > > > > Yes, sorry, what I put in the email about authorisation wasn't
> what I
> > > put
> > > > > in the KIP (I revised the KIP after drafting the email and then
> > forgot
> > > to
> > > > > update the email).
> > > > >
> > > > > Although KIP-133 proposes a Broker resource, I don't see one in the
> > > code
> > > > > and KIP-133 was supposedly delivered in 0.11. Can anyone fill in
> the
> > > > > story
> > > > > here? Is it simply because the functionality to update broker
> configs
> > > > > hasn't been implemented yet?
> > > >
> > > > Look in
> > > > ./clients/src/main/java/org/apache/kafka/common/config/
> > > > ConfigResource.java,
> > > > for the BROKER resource.  I bet you're looking at the Resource class
> > > > used for ACLs, which is a different class.
> > > >
> > > > >
> > > > > As currently proposed, both reassignPartitions() and
> > > > > alterInterBrokerThrottle()
> > > > > require Alter on the Cluster. If we used alterConfigs() to set the
> > > > > throttles then we create a situation where the triggering of the
> > > > > reassignment required Alter(Cluster), but the throttling required
> > > > > Alter(Broker), and the user might have the former but not the
> > latter. I
> > > > > don't think this is likely to be a big deal in practice, but maybe
> > > others
> > > > > disagree?
> > > >
> > > > Alter:Cluster is essentially root, though.  If you have Alter:Cluster
> > > > and you don't have AlterConfigs:Broker, you can just create a new ACL
> > > > giving it to yourself (creating and deleting ACLs is one of the
> powers
> > > > of Alter:Cluster)
> > > >
> > > > cheers,
> > > > Colin
> > > >
> > > > >
> > > > >
> > > > > > >
> > > > > > > 2. Is reassignPartitions() really the best name? I find the
> > > > distinction
> > > > > > > between reassigning and just assigning to be of limited value,
> > and
> > > > > > > "reassign" is misleading when the APIs now used for changing
> the
> > > > > > > replication factor too. Since the API is asynchronous/long
> > running,
> > > > it
> > > > > > > might be better to indicate that in the name some how. What
> about
> > > > > > > startPartitionAssignment()?
> > > > > >
> > > > > > Good idea -- I like the idea of using "start" or "initiate" to
> > > indicate
> > > > > > that this is kicking off a long-running operation.
> > > > > >
> > > > > > "reassign" seemed like a natural choice to me since this is
> > changing
> > > an
> > > > > > existing assignment.  It was assigned (when the topic it was
> > > created)--
> > > > > > now it's being re-assigned.  If you change it to just "assign"
> then
> > > it
> > > > > > feels confusing to me.  A new user might ask if "assigning
> > > partitions"
> > > > > > is a step that they should take after creating a topic, similar
> to
> > > how
> > > > > > subscribing to topics is a step you take after creating a
> consumer.
> > > > > >
> > > > >
> > > > > Yeah, I find this new user argument persuasive, so I'm happy to
> stick
> > > > > with
> > > > > reassign.
> > > > >
> > > > > Thanks again for the feedback,
> > > > >
> > > > > Tom
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Ismael Juma <is...@juma.me.uk>.
Hi Tom,

It won't be used within Kafka, but it's a public API that can be used by
other projects. And the protocol can be used by non-Java clients. So, there
is still value in including it.

Regarding the dynamic configs, I personally still think we should have a
specific protocol API for that. With regards to throttling, it would be
worth thinking about a way where the throttling configs can be
automatically removed without the user having to re-run the tool. So, yes,
maybe it should be a separate KIP as well.

Not sure if we need it in the template, but you're welcome to mention any
dependencies when there are some.

Thanks,
Ismael

On Thu, Sep 7, 2017 at 5:15 PM, Tom Bentley <t....@gmail.com> wrote:

> Hi Ismael,
>
> It would be good to get at least some of this into 1.0.0.
>
> We could put the increasePartitions() work into another KIP, but it would
> be an unused AdminClient API in that release. The consumer of this API will
> be the TopicsCommand when that get refactored to use the AdminClient.
> That's something Paolo Patierno is proposing to do but afaik not in time
> for 1.0.0. I don't think that's an issue, though, so I'll split out a
> separate KIP.
>
> FWIW, we could also split out the proposal to support describeConfigs() and
> alterConfigs() for dynamic configs into a separate KIP too. But that's also
> a decision we can defer until we're looking at the remainder of KIP-179.
>
> Aside: I wonder if it would be a good idea for the KIP template to have a
> "Depends on" field so people can more easily keep track of how multiple
> in-flight KIPs depend on one another?
>
> Cheers,
>
> Tom
>
> On 7 September 2017 at 16:42, Ismael Juma <is...@juma.me.uk> wrote:
>
> > Hi Tom,
> >
> > What do you think of moving `increasePartitionsCount` (or
> > `increaseNumPartitions`) to a separate KIP? That is simple enough that we
> > could potentially include it in 1.0.0. I'd be happy to review it.
> > ReassignPartitions is more complex and we can probably aim to include
> that
> > in the January release. What do you think?
> >
> > Ismael
> >
> > On Wed, Sep 6, 2017 at 11:42 PM, Colin McCabe <cm...@apache.org>
> wrote:
> >
> > > On Wed, Sep 6, 2017, at 00:20, Tom Bentley wrote:
> > > > Hi Ted and Colin,
> > > >
> > > > Thanks for the comments.
> > > >
> > > > It seems you're both happier with reassign rather than assign, so I'm
> > > > happy
> > > > to stick with that.
> > > >
> > > >
> > > > On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org>
> wrote:
> > > >
> > > > > ...
> > > >
> > > >
> > > > > Do we expect that reducing the number of partitions will ever be
> > > > > supported by this API?  It seems like decreasing would require a
> > > > > different API-- one which supported data movement, had a "check
> > status
> > > > > of this operation" feature, etc. etc.  If this API is only ever
> going
> > > to
> > > > > be used to increase the number of partitions, I think we should
> just
> > > > > call it "increasePartitionCount" to avoid confusion.
> > > > >
> > > >
> > > > I thought a little about the decrease possibility (hence the static
> > > > factory
> > > > methods on PartitionCount), but not really in enough detail. I
> suppose
> > a
> > > > decrease process could look like this:
> > > >
> > > > 1. Producers start partitioning based on the decreased partition
> count.
> > > > 2. Consumers continue to consume from all partitions.
> > > > 3. At some point all the records in the old partitions have expired
> and
> > > > they can be deleted.
> > > >
> > > > This wouldn't work for compacted topics. Of course a more aggressive
> > > > strategy is also possible where we forcibly move data from the
> > partitions
> > > > to be deleted.
> > > >
> > > > Anyway, in either case the process would be long running, whereas the
> > > > increase case is fast, so the semantics are quite different. So I
> > agree,
> > > > lets rename the method to make clear that it's only for increasing
> the
> > > > partition count.
> > > >
> > > > >
> > > > > > Outstanding questions:
> > > > > >
> > > > > > 1. Is the proposed alterInterReplicaThrottle() API really better
> > than
> > > > > > changing the throttle via alterConfigs()?
> > > > >
> > > > > That's a good point.  I would argue that we should just use
> > > alterConfigs
> > > > > to set the broker configuration, rather than having a special RPC
> > just
> > > > > for this.
> > > > >
> > > >
> > > > Yes, I'm minded to agree.
> > > >
> > > > The reason I originally thought a special API might be better was if
> > > > people
> > > > felt that the DynamicConfig mechanism (which seems to exist only to
> > > > support
> > > > these throttles) was an implementation detail of the throttles. But I
> > now
> > > > realise that they're visible via kafka-topics.sh, so they're
> > effectively
> > > > already a public API.
> > > >
> > > >
> > > > >
> > > > > ...
> > > > > > Would it be a problem that
> > > > > > triggering the reassignment required ClusterAction on the
> Cluster,
> > > but
> > > > > > throttling the assignment required Alter on the Topic? What if a
> > > user had
> > > > > > the former permission, but not the latter?
> > > > >
> > > > > We've been trying to reserve ClusterAction on Cluster for
> > > > > broker-initiated operations.  Alter on Cluster is the ACL for "root
> > > > > stuff" and I would argue that it should be what we use here.
> > > > >
> > > > > For reconfiguring the broker, I think we should follow KIP-133 and
> > use
> > > > > AlterConfigs on the Broker resource.  (Of course, if you just use
> the
> > > > > existing alterConfigs call, you get this without any special
> effort.)
> > > > >
> > > >
> > > > Yes, sorry, what I put in the email about authorisation wasn't what I
> > put
> > > > in the KIP (I revised the KIP after drafting the email and then
> forgot
> > to
> > > > update the email).
> > > >
> > > > Although KIP-133 proposes a Broker resource, I don't see one in the
> > code
> > > > and KIP-133 was supposedly delivered in 0.11. Can anyone fill in the
> > > > story
> > > > here? Is it simply because the functionality to update broker configs
> > > > hasn't been implemented yet?
> > >
> > > Look in
> > > ./clients/src/main/java/org/apache/kafka/common/config/
> > > ConfigResource.java,
> > > for the BROKER resource.  I bet you're looking at the Resource class
> > > used for ACLs, which is a different class.
> > >
> > > >
> > > > As currently proposed, both reassignPartitions() and
> > > > alterInterBrokerThrottle()
> > > > require Alter on the Cluster. If we used alterConfigs() to set the
> > > > throttles then we create a situation where the triggering of the
> > > > reassignment required Alter(Cluster), but the throttling required
> > > > Alter(Broker), and the user might have the former but not the
> latter. I
> > > > don't think this is likely to be a big deal in practice, but maybe
> > others
> > > > disagree?
> > >
> > > Alter:Cluster is essentially root, though.  If you have Alter:Cluster
> > > and you don't have AlterConfigs:Broker, you can just create a new ACL
> > > giving it to yourself (creating and deleting ACLs is one of the powers
> > > of Alter:Cluster)
> > >
> > > cheers,
> > > Colin
> > >
> > > >
> > > >
> > > > > >
> > > > > > 2. Is reassignPartitions() really the best name? I find the
> > > distinction
> > > > > > between reassigning and just assigning to be of limited value,
> and
> > > > > > "reassign" is misleading when the APIs now used for changing the
> > > > > > replication factor too. Since the API is asynchronous/long
> running,
> > > it
> > > > > > might be better to indicate that in the name some how. What about
> > > > > > startPartitionAssignment()?
> > > > >
> > > > > Good idea -- I like the idea of using "start" or "initiate" to
> > indicate
> > > > > that this is kicking off a long-running operation.
> > > > >
> > > > > "reassign" seemed like a natural choice to me since this is
> changing
> > an
> > > > > existing assignment.  It was assigned (when the topic it was
> > created)--
> > > > > now it's being re-assigned.  If you change it to just "assign" then
> > it
> > > > > feels confusing to me.  A new user might ask if "assigning
> > partitions"
> > > > > is a step that they should take after creating a topic, similar to
> > how
> > > > > subscribing to topics is a step you take after creating a consumer.
> > > > >
> > > >
> > > > Yeah, I find this new user argument persuasive, so I'm happy to stick
> > > > with
> > > > reassign.
> > > >
> > > > Thanks again for the feedback,
> > > >
> > > > Tom
> > >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi Ismael,

It would be good to get at least some of this into 1.0.0.

We could put the increasePartitions() work into another KIP, but it would
be an unused AdminClient API in that release. The consumer of this API will
be the TopicsCommand when that get refactored to use the AdminClient.
That's something Paolo Patierno is proposing to do but afaik not in time
for 1.0.0. I don't think that's an issue, though, so I'll split out a
separate KIP.

FWIW, we could also split out the proposal to support describeConfigs() and
alterConfigs() for dynamic configs into a separate KIP too. But that's also
a decision we can defer until we're looking at the remainder of KIP-179.

Aside: I wonder if it would be a good idea for the KIP template to have a
"Depends on" field so people can more easily keep track of how multiple
in-flight KIPs depend on one another?

Cheers,

Tom

On 7 September 2017 at 16:42, Ismael Juma <is...@juma.me.uk> wrote:

> Hi Tom,
>
> What do you think of moving `increasePartitionsCount` (or
> `increaseNumPartitions`) to a separate KIP? That is simple enough that we
> could potentially include it in 1.0.0. I'd be happy to review it.
> ReassignPartitions is more complex and we can probably aim to include that
> in the January release. What do you think?
>
> Ismael
>
> On Wed, Sep 6, 2017 at 11:42 PM, Colin McCabe <cm...@apache.org> wrote:
>
> > On Wed, Sep 6, 2017, at 00:20, Tom Bentley wrote:
> > > Hi Ted and Colin,
> > >
> > > Thanks for the comments.
> > >
> > > It seems you're both happier with reassign rather than assign, so I'm
> > > happy
> > > to stick with that.
> > >
> > >
> > > On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org> wrote:
> > >
> > > > ...
> > >
> > >
> > > > Do we expect that reducing the number of partitions will ever be
> > > > supported by this API?  It seems like decreasing would require a
> > > > different API-- one which supported data movement, had a "check
> status
> > > > of this operation" feature, etc. etc.  If this API is only ever going
> > to
> > > > be used to increase the number of partitions, I think we should just
> > > > call it "increasePartitionCount" to avoid confusion.
> > > >
> > >
> > > I thought a little about the decrease possibility (hence the static
> > > factory
> > > methods on PartitionCount), but not really in enough detail. I suppose
> a
> > > decrease process could look like this:
> > >
> > > 1. Producers start partitioning based on the decreased partition count.
> > > 2. Consumers continue to consume from all partitions.
> > > 3. At some point all the records in the old partitions have expired and
> > > they can be deleted.
> > >
> > > This wouldn't work for compacted topics. Of course a more aggressive
> > > strategy is also possible where we forcibly move data from the
> partitions
> > > to be deleted.
> > >
> > > Anyway, in either case the process would be long running, whereas the
> > > increase case is fast, so the semantics are quite different. So I
> agree,
> > > lets rename the method to make clear that it's only for increasing the
> > > partition count.
> > >
> > > >
> > > > > Outstanding questions:
> > > > >
> > > > > 1. Is the proposed alterInterReplicaThrottle() API really better
> than
> > > > > changing the throttle via alterConfigs()?
> > > >
> > > > That's a good point.  I would argue that we should just use
> > alterConfigs
> > > > to set the broker configuration, rather than having a special RPC
> just
> > > > for this.
> > > >
> > >
> > > Yes, I'm minded to agree.
> > >
> > > The reason I originally thought a special API might be better was if
> > > people
> > > felt that the DynamicConfig mechanism (which seems to exist only to
> > > support
> > > these throttles) was an implementation detail of the throttles. But I
> now
> > > realise that they're visible via kafka-topics.sh, so they're
> effectively
> > > already a public API.
> > >
> > >
> > > >
> > > > ...
> > > > > Would it be a problem that
> > > > > triggering the reassignment required ClusterAction on the Cluster,
> > but
> > > > > throttling the assignment required Alter on the Topic? What if a
> > user had
> > > > > the former permission, but not the latter?
> > > >
> > > > We've been trying to reserve ClusterAction on Cluster for
> > > > broker-initiated operations.  Alter on Cluster is the ACL for "root
> > > > stuff" and I would argue that it should be what we use here.
> > > >
> > > > For reconfiguring the broker, I think we should follow KIP-133 and
> use
> > > > AlterConfigs on the Broker resource.  (Of course, if you just use the
> > > > existing alterConfigs call, you get this without any special effort.)
> > > >
> > >
> > > Yes, sorry, what I put in the email about authorisation wasn't what I
> put
> > > in the KIP (I revised the KIP after drafting the email and then forgot
> to
> > > update the email).
> > >
> > > Although KIP-133 proposes a Broker resource, I don't see one in the
> code
> > > and KIP-133 was supposedly delivered in 0.11. Can anyone fill in the
> > > story
> > > here? Is it simply because the functionality to update broker configs
> > > hasn't been implemented yet?
> >
> > Look in
> > ./clients/src/main/java/org/apache/kafka/common/config/
> > ConfigResource.java,
> > for the BROKER resource.  I bet you're looking at the Resource class
> > used for ACLs, which is a different class.
> >
> > >
> > > As currently proposed, both reassignPartitions() and
> > > alterInterBrokerThrottle()
> > > require Alter on the Cluster. If we used alterConfigs() to set the
> > > throttles then we create a situation where the triggering of the
> > > reassignment required Alter(Cluster), but the throttling required
> > > Alter(Broker), and the user might have the former but not the latter. I
> > > don't think this is likely to be a big deal in practice, but maybe
> others
> > > disagree?
> >
> > Alter:Cluster is essentially root, though.  If you have Alter:Cluster
> > and you don't have AlterConfigs:Broker, you can just create a new ACL
> > giving it to yourself (creating and deleting ACLs is one of the powers
> > of Alter:Cluster)
> >
> > cheers,
> > Colin
> >
> > >
> > >
> > > > >
> > > > > 2. Is reassignPartitions() really the best name? I find the
> > distinction
> > > > > between reassigning and just assigning to be of limited value, and
> > > > > "reassign" is misleading when the APIs now used for changing the
> > > > > replication factor too. Since the API is asynchronous/long running,
> > it
> > > > > might be better to indicate that in the name some how. What about
> > > > > startPartitionAssignment()?
> > > >
> > > > Good idea -- I like the idea of using "start" or "initiate" to
> indicate
> > > > that this is kicking off a long-running operation.
> > > >
> > > > "reassign" seemed like a natural choice to me since this is changing
> an
> > > > existing assignment.  It was assigned (when the topic it was
> created)--
> > > > now it's being re-assigned.  If you change it to just "assign" then
> it
> > > > feels confusing to me.  A new user might ask if "assigning
> partitions"
> > > > is a step that they should take after creating a topic, similar to
> how
> > > > subscribing to topics is a step you take after creating a consumer.
> > > >
> > >
> > > Yeah, I find this new user argument persuasive, so I'm happy to stick
> > > with
> > > reassign.
> > >
> > > Thanks again for the feedback,
> > >
> > > Tom
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Ismael Juma <is...@juma.me.uk>.
Hi Tom,

What do you think of moving `increasePartitionsCount` (or
`increaseNumPartitions`) to a separate KIP? That is simple enough that we
could potentially include it in 1.0.0. I'd be happy to review it.
ReassignPartitions is more complex and we can probably aim to include that
in the January release. What do you think?

Ismael

On Wed, Sep 6, 2017 at 11:42 PM, Colin McCabe <cm...@apache.org> wrote:

> On Wed, Sep 6, 2017, at 00:20, Tom Bentley wrote:
> > Hi Ted and Colin,
> >
> > Thanks for the comments.
> >
> > It seems you're both happier with reassign rather than assign, so I'm
> > happy
> > to stick with that.
> >
> >
> > On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org> wrote:
> >
> > > ...
> >
> >
> > > Do we expect that reducing the number of partitions will ever be
> > > supported by this API?  It seems like decreasing would require a
> > > different API-- one which supported data movement, had a "check status
> > > of this operation" feature, etc. etc.  If this API is only ever going
> to
> > > be used to increase the number of partitions, I think we should just
> > > call it "increasePartitionCount" to avoid confusion.
> > >
> >
> > I thought a little about the decrease possibility (hence the static
> > factory
> > methods on PartitionCount), but not really in enough detail. I suppose a
> > decrease process could look like this:
> >
> > 1. Producers start partitioning based on the decreased partition count.
> > 2. Consumers continue to consume from all partitions.
> > 3. At some point all the records in the old partitions have expired and
> > they can be deleted.
> >
> > This wouldn't work for compacted topics. Of course a more aggressive
> > strategy is also possible where we forcibly move data from the partitions
> > to be deleted.
> >
> > Anyway, in either case the process would be long running, whereas the
> > increase case is fast, so the semantics are quite different. So I agree,
> > lets rename the method to make clear that it's only for increasing the
> > partition count.
> >
> > >
> > > > Outstanding questions:
> > > >
> > > > 1. Is the proposed alterInterReplicaThrottle() API really better than
> > > > changing the throttle via alterConfigs()?
> > >
> > > That's a good point.  I would argue that we should just use
> alterConfigs
> > > to set the broker configuration, rather than having a special RPC just
> > > for this.
> > >
> >
> > Yes, I'm minded to agree.
> >
> > The reason I originally thought a special API might be better was if
> > people
> > felt that the DynamicConfig mechanism (which seems to exist only to
> > support
> > these throttles) was an implementation detail of the throttles. But I now
> > realise that they're visible via kafka-topics.sh, so they're effectively
> > already a public API.
> >
> >
> > >
> > > ...
> > > > Would it be a problem that
> > > > triggering the reassignment required ClusterAction on the Cluster,
> but
> > > > throttling the assignment required Alter on the Topic? What if a
> user had
> > > > the former permission, but not the latter?
> > >
> > > We've been trying to reserve ClusterAction on Cluster for
> > > broker-initiated operations.  Alter on Cluster is the ACL for "root
> > > stuff" and I would argue that it should be what we use here.
> > >
> > > For reconfiguring the broker, I think we should follow KIP-133 and use
> > > AlterConfigs on the Broker resource.  (Of course, if you just use the
> > > existing alterConfigs call, you get this without any special effort.)
> > >
> >
> > Yes, sorry, what I put in the email about authorisation wasn't what I put
> > in the KIP (I revised the KIP after drafting the email and then forgot to
> > update the email).
> >
> > Although KIP-133 proposes a Broker resource, I don't see one in the code
> > and KIP-133 was supposedly delivered in 0.11. Can anyone fill in the
> > story
> > here? Is it simply because the functionality to update broker configs
> > hasn't been implemented yet?
>
> Look in
> ./clients/src/main/java/org/apache/kafka/common/config/
> ConfigResource.java,
> for the BROKER resource.  I bet you're looking at the Resource class
> used for ACLs, which is a different class.
>
> >
> > As currently proposed, both reassignPartitions() and
> > alterInterBrokerThrottle()
> > require Alter on the Cluster. If we used alterConfigs() to set the
> > throttles then we create a situation where the triggering of the
> > reassignment required Alter(Cluster), but the throttling required
> > Alter(Broker), and the user might have the former but not the latter. I
> > don't think this is likely to be a big deal in practice, but maybe others
> > disagree?
>
> Alter:Cluster is essentially root, though.  If you have Alter:Cluster
> and you don't have AlterConfigs:Broker, you can just create a new ACL
> giving it to yourself (creating and deleting ACLs is one of the powers
> of Alter:Cluster)
>
> cheers,
> Colin
>
> >
> >
> > > >
> > > > 2. Is reassignPartitions() really the best name? I find the
> distinction
> > > > between reassigning and just assigning to be of limited value, and
> > > > "reassign" is misleading when the APIs now used for changing the
> > > > replication factor too. Since the API is asynchronous/long running,
> it
> > > > might be better to indicate that in the name some how. What about
> > > > startPartitionAssignment()?
> > >
> > > Good idea -- I like the idea of using "start" or "initiate" to indicate
> > > that this is kicking off a long-running operation.
> > >
> > > "reassign" seemed like a natural choice to me since this is changing an
> > > existing assignment.  It was assigned (when the topic it was created)--
> > > now it's being re-assigned.  If you change it to just "assign" then it
> > > feels confusing to me.  A new user might ask if "assigning partitions"
> > > is a step that they should take after creating a topic, similar to how
> > > subscribing to topics is a step you take after creating a consumer.
> > >
> >
> > Yeah, I find this new user argument persuasive, so I'm happy to stick
> > with
> > reassign.
> >
> > Thanks again for the feedback,
> >
> > Tom
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Colin McCabe <cm...@apache.org>.
On Wed, Sep 6, 2017, at 00:20, Tom Bentley wrote:
> Hi Ted and Colin,
> 
> Thanks for the comments.
> 
> It seems you're both happier with reassign rather than assign, so I'm
> happy
> to stick with that.
> 
> 
> On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org> wrote:
> 
> > ...
> 
> 
> > Do we expect that reducing the number of partitions will ever be
> > supported by this API?  It seems like decreasing would require a
> > different API-- one which supported data movement, had a "check status
> > of this operation" feature, etc. etc.  If this API is only ever going to
> > be used to increase the number of partitions, I think we should just
> > call it "increasePartitionCount" to avoid confusion.
> >
> 
> I thought a little about the decrease possibility (hence the static
> factory
> methods on PartitionCount), but not really in enough detail. I suppose a
> decrease process could look like this:
> 
> 1. Producers start partitioning based on the decreased partition count.
> 2. Consumers continue to consume from all partitions.
> 3. At some point all the records in the old partitions have expired and
> they can be deleted.
> 
> This wouldn't work for compacted topics. Of course a more aggressive
> strategy is also possible where we forcibly move data from the partitions
> to be deleted.
> 
> Anyway, in either case the process would be long running, whereas the
> increase case is fast, so the semantics are quite different. So I agree,
> lets rename the method to make clear that it's only for increasing the
> partition count.
> 
> >
> > > Outstanding questions:
> > >
> > > 1. Is the proposed alterInterReplicaThrottle() API really better than
> > > changing the throttle via alterConfigs()?
> >
> > That's a good point.  I would argue that we should just use alterConfigs
> > to set the broker configuration, rather than having a special RPC just
> > for this.
> >
> 
> Yes, I'm minded to agree.
> 
> The reason I originally thought a special API might be better was if
> people
> felt that the DynamicConfig mechanism (which seems to exist only to
> support
> these throttles) was an implementation detail of the throttles. But I now
> realise that they're visible via kafka-topics.sh, so they're effectively
> already a public API.
> 
> 
> >
> > ...
> > > Would it be a problem that
> > > triggering the reassignment required ClusterAction on the Cluster, but
> > > throttling the assignment required Alter on the Topic? What if a user had
> > > the former permission, but not the latter?
> >
> > We've been trying to reserve ClusterAction on Cluster for
> > broker-initiated operations.  Alter on Cluster is the ACL for "root
> > stuff" and I would argue that it should be what we use here.
> >
> > For reconfiguring the broker, I think we should follow KIP-133 and use
> > AlterConfigs on the Broker resource.  (Of course, if you just use the
> > existing alterConfigs call, you get this without any special effort.)
> >
> 
> Yes, sorry, what I put in the email about authorisation wasn't what I put
> in the KIP (I revised the KIP after drafting the email and then forgot to
> update the email).
> 
> Although KIP-133 proposes a Broker resource, I don't see one in the code
> and KIP-133 was supposedly delivered in 0.11. Can anyone fill in the
> story
> here? Is it simply because the functionality to update broker configs
> hasn't been implemented yet?

Look in
./clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java,
for the BROKER resource.  I bet you're looking at the Resource class
used for ACLs, which is a different class.

> 
> As currently proposed, both reassignPartitions() and
> alterInterBrokerThrottle()
> require Alter on the Cluster. If we used alterConfigs() to set the
> throttles then we create a situation where the triggering of the
> reassignment required Alter(Cluster), but the throttling required
> Alter(Broker), and the user might have the former but not the latter. I
> don't think this is likely to be a big deal in practice, but maybe others
> disagree?

Alter:Cluster is essentially root, though.  If you have Alter:Cluster
and you don't have AlterConfigs:Broker, you can just create a new ACL
giving it to yourself (creating and deleting ACLs is one of the powers
of Alter:Cluster)

cheers,
Colin

> 
> 
> > >
> > > 2. Is reassignPartitions() really the best name? I find the distinction
> > > between reassigning and just assigning to be of limited value, and
> > > "reassign" is misleading when the APIs now used for changing the
> > > replication factor too. Since the API is asynchronous/long running, it
> > > might be better to indicate that in the name some how. What about
> > > startPartitionAssignment()?
> >
> > Good idea -- I like the idea of using "start" or "initiate" to indicate
> > that this is kicking off a long-running operation.
> >
> > "reassign" seemed like a natural choice to me since this is changing an
> > existing assignment.  It was assigned (when the topic it was created)--
> > now it's being re-assigned.  If you change it to just "assign" then it
> > feels confusing to me.  A new user might ask if "assigning partitions"
> > is a step that they should take after creating a topic, similar to how
> > subscribing to topics is a step you take after creating a consumer.
> >
> 
> Yeah, I find this new user argument persuasive, so I'm happy to stick
> with
> reassign.
> 
> Thanks again for the feedback,
> 
> Tom

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi Ted and Colin,

Thanks for the comments.

It seems you're both happier with reassign rather than assign, so I'm happy
to stick with that.


On 5 September 2017 at 18:46, Colin McCabe <cm...@apache.org> wrote:

> ...


> Do we expect that reducing the number of partitions will ever be
> supported by this API?  It seems like decreasing would require a
> different API-- one which supported data movement, had a "check status
> of this operation" feature, etc. etc.  If this API is only ever going to
> be used to increase the number of partitions, I think we should just
> call it "increasePartitionCount" to avoid confusion.
>

I thought a little about the decrease possibility (hence the static factory
methods on PartitionCount), but not really in enough detail. I suppose a
decrease process could look like this:

1. Producers start partitioning based on the decreased partition count.
2. Consumers continue to consume from all partitions.
3. At some point all the records in the old partitions have expired and
they can be deleted.

This wouldn't work for compacted topics. Of course a more aggressive
strategy is also possible where we forcibly move data from the partitions
to be deleted.

Anyway, in either case the process would be long running, whereas the
increase case is fast, so the semantics are quite different. So I agree,
lets rename the method to make clear that it's only for increasing the
partition count.

>
> > Outstanding questions:
> >
> > 1. Is the proposed alterInterReplicaThrottle() API really better than
> > changing the throttle via alterConfigs()?
>
> That's a good point.  I would argue that we should just use alterConfigs
> to set the broker configuration, rather than having a special RPC just
> for this.
>

Yes, I'm minded to agree.

The reason I originally thought a special API might be better was if people
felt that the DynamicConfig mechanism (which seems to exist only to support
these throttles) was an implementation detail of the throttles. But I now
realise that they're visible via kafka-topics.sh, so they're effectively
already a public API.


>
> ...
> > Would it be a problem that
> > triggering the reassignment required ClusterAction on the Cluster, but
> > throttling the assignment required Alter on the Topic? What if a user had
> > the former permission, but not the latter?
>
> We've been trying to reserve ClusterAction on Cluster for
> broker-initiated operations.  Alter on Cluster is the ACL for "root
> stuff" and I would argue that it should be what we use here.
>
> For reconfiguring the broker, I think we should follow KIP-133 and use
> AlterConfigs on the Broker resource.  (Of course, if you just use the
> existing alterConfigs call, you get this without any special effort.)
>

Yes, sorry, what I put in the email about authorisation wasn't what I put
in the KIP (I revised the KIP after drafting the email and then forgot to
update the email).

Although KIP-133 proposes a Broker resource, I don't see one in the code
and KIP-133 was supposedly delivered in 0.11. Can anyone fill in the story
here? Is it simply because the functionality to update broker configs
hasn't been implemented yet?

As currently proposed, both reassignPartitions() and alterInterBrokerThrottle()
require Alter on the Cluster. If we used alterConfigs() to set the
throttles then we create a situation where the triggering of the
reassignment required Alter(Cluster), but the throttling required
Alter(Broker), and the user might have the former but not the latter. I
don't think this is likely to be a big deal in practice, but maybe others
disagree?


> >
> > 2. Is reassignPartitions() really the best name? I find the distinction
> > between reassigning and just assigning to be of limited value, and
> > "reassign" is misleading when the APIs now used for changing the
> > replication factor too. Since the API is asynchronous/long running, it
> > might be better to indicate that in the name some how. What about
> > startPartitionAssignment()?
>
> Good idea -- I like the idea of using "start" or "initiate" to indicate
> that this is kicking off a long-running operation.
>
> "reassign" seemed like a natural choice to me since this is changing an
> existing assignment.  It was assigned (when the topic it was created)--
> now it's being re-assigned.  If you change it to just "assign" then it
> feels confusing to me.  A new user might ask if "assigning partitions"
> is a step that they should take after creating a topic, similar to how
> subscribing to topics is a step you take after creating a consumer.
>

Yeah, I find this new user argument persuasive, so I'm happy to stick with
reassign.

Thanks again for the feedback,

Tom

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Colin McCabe <cm...@apache.org>.
On Tue, Sep 5, 2017, at 09:39, Tom Bentley wrote:
> I've revised this KIP again:
> 
> * Change the alterPartitionCounts() API to support passing an optional
> assignment for the new partitions (which is already supported by
> kafka-topics.sh). At the same time I didn't want the API to suggest it
> was
> possible to change the existing assignments in the same call (which isn't
> supported today).
> 
> * Removed alterReplicationFactor(), since it's really just a special case
> of reassignPartitions(): In both cases it is necessary to give a complete
> partition assignment and both can be long running due to data movement.

Thanks, Tom.

Do we expect that reducing the number of partitions will ever be
supported by this API?  It seems like decreasing would require a
different API-- one which supported data movement, had a "check status
of this operation" feature, etc. etc.  If this API is only ever going to
be used to increase the number of partitions, I think we should just
call it "increasePartitionCount" to avoid confusion.

>
> Outstanding questions:
> 
> 1. Is the proposed alterInterReplicaThrottle() API really better than
> changing the throttle via alterConfigs()?

That's a good point.  I would argue that we should just use alterConfigs
to set the broker configuration, rather than having a special RPC just
for this.  

> It wouldn't be necessary to
> support alterConfigs() for all broker configs, just DynamicConfigs (which
> can't be set via the config file any way).

Hmm.  For this purpose, it's not even necessary to support alterConfigs
for all dynamic configurations.  You could just support alterConfigs on
this particular dynamic configuration key.  It might be better to be
conservative here, since exposing all of our internals could lead to
compatibility problems later on.

> Would it be a problem that
> triggering the reassignment required ClusterAction on the Cluster, but
> throttling the assignment required Alter on the Topic? What if a user had
> the former permission, but not the latter?

We've been trying to reserve ClusterAction on Cluster for
broker-initiated operations.  Alter on Cluster is the ACL for "root
stuff" and I would argue that it should be what we use here.

For reconfiguring the broker, I think we should follow KIP-133 and use
AlterConfigs on the Broker resource.  (Of course, if you just use the
existing alterConfigs call, you get this without any special effort.)

> 
> 2. Is reassignPartitions() really the best name? I find the distinction
> between reassigning and just assigning to be of limited value, and
> "reassign" is misleading when the APIs now used for changing the
> replication factor too. Since the API is asynchronous/long running, it
> might be better to indicate that in the name some how. What about
> startPartitionAssignment()?

Good idea -- I like the idea of using "start" or "initiate" to indicate
that this is kicking off a long-running operation.

"reassign" seemed like a natural choice to me since this is changing an
existing assignment.  It was assigned (when the topic it was created)--
now it's being re-assigned.  If you change it to just "assign" then it
feels confusing to me.  A new user might ask if "assigning partitions"
is a step that they should take after creating a topic, similar to how
subscribing to topics is a step you take after creating a consumer.

Basically, "reassign" makes it clear that it is changing an assignment
that already exists.  "assign" leaves open the possibility that no
assignment exists (which of course we know is not true, but it would be
nice if the name reflected that.)

> On 30 August 2017 at 16:17, Tom Bentley <t....@gmail.com> wrote:
> 
> > Hi all,
> >
> > I've updated the KIP as follows:
> >
> > * remove the APIs supporting progress reporting in favour of the APIs
> > being implemented in KIP-113.
> > * added some APIs to cover the existing functionality around throttling
> > inter-broker transfers, which was previously a TODO.
> >
> > To respond to Colin's suggestion:
> >
> > > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> > >                                   Map<String, TopicAlteration> alters)
> > >
> > > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> > >
> > > ReplicationFactorAlteration(int repl) implements TopicAlteration
> > >
> > > ReassignPartitionsAlteration(...) implements TopicAlteration
> >
> > That is a workable alternative to providing 3 separate methods on the
> > AdminClient, but I don't see why it is objectively better. I don't see
> > clients commonly needing to do a mixture of alterations, and it assumes
> > that the options make sense for all alterations.

Yeah-- it was just an idea I had for an alternate API that grouped
things a little bit more.  I agree that it does constrain us to using
only options that make sense for all the operations, which is not ideal.
 So your existing split into separate functions probably makes more
sense.

cheers,
Colin

> >
> > Cheers,
> >
> > Tom
> >
> > On 22 August 2017 at 23:49, Colin McCabe <cm...@apache.org> wrote:
> >
> >> On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
> >> > Hi Dong and Jun,
> >> >
> >> > Thanks again for your input in this discussion and on KIP-113. It's
> >> > difficult that discussion is split between this thread and the one for
> >> > KIP-113, but I'll try to respond on this thread to questions asked on
> >> > this
> >> > thread.
> >> >
> >> > It seems there is some consensus that the alterTopic() API is the wrong
> >> > thing, and it would make more sense to separate the different kinds of
> >> > alteration into separate APIs. It seems to me there is then a choice.
> >> >
> >> > 1. Have separate alterPartitionCount(), alterReplicationFactor() and
> >> > reassignPartitions() methods. This would most closely match the
> >> > facilities
> >> > currently offered by kafka-alter-topics and kafka-reassign-partitions.
> >> > 2. Just provide a reassignPartitions() method and infer from the shape
> >> of
> >> > the data passed to that that a change in replication factor and/or
> >> > partition count is required, as suggested by Dong.
> >> >
> >> > The choice we make here is also relevant to KIP-113 and KIP-178. By
> >> > choosing (1) we can change the replication factor or partition count
> >> > without providing an assignment and therefore are necessarily requiring
> >> > the
> >> > controller to make a decision for us about which broker (and, for 113,
> >> > which log directory and thus which disk) the replica should be reside.
> >> > There is then the matter of what criteria the controller should use to
> >> > make
> >> > that decision (a decision is also required on topic creation of course,
> >> > but
> >> > I'll try not to go there right now).
> >>
> >> Hmm.  One approach would be to have something like
> >>
> >> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> >> >                                   Map<String, TopicAlteration> alters)
> >> >
> >> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> >> >
> >> > ReplicationFactorAlteration(int repl) implements TopicAlteration
> >> >
> >> > ReassignPartitionsAlteration(...) implements TopicAlteration
> >>
> >>
> >> >
> >> > Choosing (2), on the other hand, forces us to make an assignment, and
> >> > currently the AdminClient doesn't provide the APIs necessary to make a
> >> > very
> >> > informed decision. To do the job properly we'd need APIs to enumerate
> >> the
> >> > permissible log directories on each broker, know the current disk usage
> >> > etc. These just don't exist today, and I think it would be a lot of work
> >> > to
> >> > bite off to specify them. We've already got three KIPs on the go.
> >> >
> >> > So, for the moment, I think we have to choose 1, for pragmatic reasons.
> >> > There is nothing to stop us deprecating alterPartitionCount() and
> >> > alterReplicationFactor() and moving to (2) at a later date.
> >>
> >> Yeah, the API should be pragmatic.  An API that doesn't let us do what
> >> we want to do (like let Kafka assign us a new replica on the least
> >> loaded node) is not good.
> >>
> >> >
> >> > To answer a specific questions of Jun's:
> >> >
> >> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> >> > > means.
> >> >
> >> >
> >> > It corresponds to the ReplicaStatus.statusTime, and the idea was it was
> >> > the
> >> > epoch offset at which the controllers notion of the lag was current
> >> > (which
> >> > I think is the epoch offset of the last FetchRequest from the follower).
> >> > At
> >> > one point I was considering some other ways of determining progress and
> >> > completion, and TBH I think it was more pertinent to those rejected
> >> > alternatives.
> >> >
> >> > I've already made some changes to KIP-179. As suggested in the thread
> >> for
> >> > KIP-113, I will be changing some more since I think we can
> >> > DescribeDirsRequest
> >> > instead of ReplicaStatusResponse to implement the --progress option.
> >>
> >> That makes sense.  Whatever response is used, it would be nice to have
> >> an error message in addition to an error code.
> >>
> >> best,
> >> Colin
> >>
> >> >
> >> > Cheers,
> >> >
> >> > Tom
> >> >
> >> >
> >> >
> >> > On 9 August 2017 at 02:28, Jun Rao <ju...@confluent.io> wrote:
> >> >
> >> > > Hi, Tom,
> >> > >
> >> > > Thanks for the KIP. A few minor comments below.
> >> > >
> >> > > 1. Implementation wise, the broker handles adding partitions
> >> differently
> >> > > from changing replica assignment. For the former, we directly update
> >> the
> >> > > topic path in ZK with the new partitions. For the latter, we write
> >> the new
> >> > > partition reassignment in the partition reassignment path. Changing
> >> the
> >> > > replication factor is handled in the same way as changing replica
> >> > > assignment. So, it would be useful to document how the broker handles
> >> these
> >> > > different cases accordingly. I think it's simpler to just allow one
> >> change
> >> > > (partition, replication factor, or replica assignment) in a request.
> >> > >
> >> > > 2. Currently, we only allow adding partitions. We probably want to
> >> document
> >> > > the restriction in the api.
> >> > >
> >> > > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> >> > > means.
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > >
> >> > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <li...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hey Tom,
> >> > > >
> >> > > > Thanks for your reply. Here are my thoughts:
> >> > > >
> >> > > > 1) I think the DescribeDirsResponse can be used by AdminClient to
> >> query
> >> > > the
> >> > > > lag of follower replica as well. Here is how it works:
> >> > > >
> >> > > > - AdminClient sends DescribeDirsRequest to both the leader and the
> >> > > follower
> >> > > > of the partition.
> >> > > > - DescribeDirsResponse from both leader and follower shows the LEO
> >> of the
> >> > > > leader replica and the follower replica.
> >> > > > - Lag of the follower replica is the difference in the LEO between
> >> leader
> >> > > > and follower.
> >> > > >
> >> > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to
> >> be
> >> > > send
> >> > > > to each replica of the partition whereas ReplicaStatusRequest only
> >> needs
> >> > > to
> >> > > > be sent to the leader of the partition. It doesn't seem to make much
> >> > > > difference though. In practice we probably want to query the
> >> replica lag
> >> > > of
> >> > > > many partitions and AminClient needs to send exactly one request to
> >> each
> >> > > > broker with either solution. Does this make sense?
> >> > > >
> >> > > >
> >> > > > 2) KIP-179 proposes to add the following AdminClient API:
> >> > > >
> >> > > > alterTopics(Collection<AlteredTopic> alteredTopics,
> >> AlterTopicsOptions
> >> > > > options)
> >> > > >
> >> > > > Where AlteredTopic includes the following fields
> >> > > >
> >> > > > AlteredTopic(String name, int numPartitions, int replicationFactor,
> >> > > > Map<Integer,List<Integer>> replicasAssignment)
> >> > > >
> >> > > > I have two comments on this:
> >> > > >
> >> > > > - The information in AlteredTopic seems a bit redundant. Both
> >> > > numPartitions
> >> > > > and replicationFactor can be derived from replicasAssignment. I
> >> think we
> >> > > > can probably just use the following API in AdminClient instead:
> >> > > >
> >> > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> >> > > > partitionAssignment, AlterTopicsOptions options)
> >> > > >
> >> > > > - Do you think "reassignPartitions" may be a better name than
> >> > > > "alterTopics"? This is more consistent with the existing name used
> >> in
> >> > > > kafka-reassign-partitions.sh and I also find it to be more
> >> accurate. On
> >> > > the
> >> > > > other hand, alterTopics seems to suggest that we can also alter
> >> topic
> >> > > > config.
> >> > > >
> >> > > >
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t....@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi Dong,
> >> > > > >
> >> > > > > Thanks for your reply.
> >> > > > >
> >> > > > > You're right that your DescribeDirsResponse contains appropriate
> >> data.
> >> > > > The
> >> > > > > comment about the log_end_offset in the KIP says "Enable user to
> >> track
> >> > > > > movement progress by comparing LEO of the *.log and *.move". That
> >> makes
> >> > > > me
> >> > > > > wonder whether this would only work for replica movement on the
> >> same
> >> > > > > broker. In the case of reassigning partitions between brokers
> >> it's only
> >> > > > > really the leader for the partition that knows the max LEO of the
> >> ISR
> >> > > and
> >> > > > > the LEO of the target broker. Maybe the comment is misleading and
> >> it
> >> > > > would
> >> > > > > be the leader doing the same thing in both cases. Can you clarify
> >> this?
> >> > > >
> >> > > >
> >> > > > > At a conceptual level there's a lot of similarity between KIP-113
> >> and
> >> > > > > KIP-179 because they're both about moving replicas around.
> >> Concretely
> >> > > > > though, ChangeReplicaDirRequest assumes the movement is on the
> >> same
> >> > > > broker,
> >> > > > > while the equivalent APIs in KIP-179 (whichever alternative) lack
> >> the
> >> > > > > notion of disks. I wonder if a unified API would be better or not.
> >> > > > > Operationally, the reasons for changing replicas between brokers
> >> are
> >> > > > likely
> >> > > > > to be motivated by balancing load across the cluster, or
> >> > > adding/removing
> >> > > > > brokers. But the JBOD use case has different motivations. So I
> >> suspect
> >> > > > it's
> >> > > > > OK that the APIs are separate, but I'd love to hear what others
> >> think.
> >> > > >
> >> > > >
> >> > > > > I think you're right about TopicPartitionReplica. At the protocol
> >> level
> >> > > > we
> >> > > > > could group topics and partitions together so avoid having the
> >> same
> >> > > topic
> >> > > > > name multiple times when querying for the status of all the
> >> partitions
> >> > > > of a
> >> > > > > topic.
> >> > > > >
> >> > > > > Thanks again for taking the time to respond.
> >> > > > >
> >> > > > > Tom
> >> > > > >
> >> > > > > On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com> wrote:
> >> > > > >
> >> > > > > > Hey Tom,
> >> > > > > >
> >> > > > > > Thanks for the KIP. It seems that the prior discussion in this
> >> thread
> >> > > > has
> >> > > > > > focused on reassigning partitions (or AlterTopics). I haven't
> >> looked
> >> > > > into
> >> > > > > > this yet. I have two comments on the replicaStatus() API and the
> >> > > > > > ReplicaStatusRequest:
> >> > > > > >
> >> > > > > > -  It seems that the use-case for ReplicaStatusRequest is
> >> covered by
> >> > > > > > the DescribeDirsRequest introduced in KIP-113
> >> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > > > > 113%3A+Support+replicas+movement+between+log+directories>.
> >> > > > > > DescribeDirsResponse contains the logEndOffset for the specified
> >> > > > > > partitions. The the lag of the follower replica can be derived
> >> as the
> >> > > > > > difference between leader's LEO and follower's LEO. Do you
> >> think we
> >> > > can
> >> > > > > > simply use DescribeDirsRequest without introducing
> >> > > > ReplicaStatusRequest?
> >> > > > > >
> >> > > > > > - According to the current KIP, replicaStatus() takes a list of
> >> > > > > partitions
> >> > > > > > as input. And its output maps the partition to a list of replica
> >> > > > status.
> >> > > > > Do
> >> > > > > > you think it would be better to have a new class called
> >> > > > > > TopicPartitionReplica, which identifies the topic, partition
> >> and the
> >> > > > > > brokerId. replicaStatus can then take a list of
> >> TopicPartitionReplica
> >> > > > as
> >> > > > > > input. And its output maps the replica to replica status. The
> >> latter
> >> > > > API
> >> > > > > > seems simpler and also matches the method name better. What do
> >> you
> >> > > > think?
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Dong
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <
> >> t.j.bentley@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi again Ismael,
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > 1. It's worth emphasising that reassigning partitions is a
> >> > > different
> >> > > > > > > >> process than what happens when a topic is created, so not
> >> sure
> >> > > > > trying
> >> > > > > > to
> >> > > > > > > >> make it symmetric is beneficial. In addition to what was
> >> already
> >> > > > > > > >> discussed,
> >> > > > > > > >> one should also enable replication throttling before
> >> moving the
> >> > > > > data.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Thanks for your responses.The only advantage I can see from
> >> > > having
> >> > > > > > > > separate APIs for partition count change vs. the other
> >> changes is
> >> > > > > that
> >> > > > > > we
> >> > > > > > > > expect the former to return synchronously, and the latter to
> >> > > > > (usually)
> >> > > > > > > > return asynchronously. Lumping them into the same API forces
> >> > > > clients
> >> > > > > of
> >> > > > > > > > that API to code for the two possible cases. Is this the
> >> > > particular
> >> > > > > > > concern
> >> > > > > > > > you had in mind, or do you have others?
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > Just to help see what this looks like I've created another
> >> version
> >> > > of
> >> > > > > > > KIP-179 (
> >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> >> > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+
> >> AdminClient
> >> > > > > > > ).
> >> > > > > > >
> >> > > > > > > The AdminClient APIs are more type safe (harder to misuse).
> >> The
> >> > > > > > > asynchronous nature of the alterReplicationFactors() and
> >> > > > > > > reassignPartitions() methods aren't really apparent (you'd
> >> still
> >> > > have
> >> > > > > to
> >> > > > > > > read the javadocs to know you had to call replicaStatus() to
> >> > > > determine
> >> > > > > > > completion), but it's still better that the lumped-together
> >> API
> >> > > > because
> >> > > > > > the
> >> > > > > > > methods are _consistently_ async. The Protocol APIs are
> >> slightly
> >> > > > nicer
> >> > > > > > too,
> >> > > > > > > in that INVALID_REQUEST is less commonly returned. Another
> >> benefit
> >> > > is
> >> > > > > it
> >> > > > > > > would allow for these different alteration APIs to be modified
> >> > > > > > > independently in the future, should that necessary.
> >> > > > > > >
> >> > > > > > > What do others think?
> >> > > > > > >
> >> > > > > > > You're right that the proposal doesn't cover setting and
> >> clearing a
> >> > > > > > > > throttle, and it should. I will study the code about how
> >> this
> >> > > works
> >> > > > > > > before
> >> > > > > > > > posting back about that.
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > AFAICS from the code the throttle is simply a dynamic config
> >> of the
> >> > > > > > broker.
> >> > > > > > > We already have the AdminClient.alterConfigs API for setting
> >> this,
> >> > > so
> >> > > > > the
> >> > > > > > > refactoring of kafka-reassign-partitions.sh could just use
> >> that
> >> > > > > existing
> >> > > > > > > API. We could still achieve your objective of setting the
> >> throttle
> >> > > > > before
> >> > > > > > > reassignment by making the --throttle option mandatory when
> >> > > --execute
> >> > > > > was
> >> > > > > > > present. Or did you have something else in mind?
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > >
> >> > > > > > > Tom
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> >
> >

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
I've revised this KIP again:

* Change the alterPartitionCounts() API to support passing an optional
assignment for the new partitions (which is already supported by
kafka-topics.sh). At the same time I didn't want the API to suggest it was
possible to change the existing assignments in the same call (which isn't
supported today).

* Removed alterReplicationFactor(), since it's really just a special case
of reassignPartitions(): In both cases it is necessary to give a complete
partition assignment and both can be long running due to data movement.


Outstanding questions:

1. Is the proposed alterInterReplicaThrottle() API really better than
changing the throttle via alterConfigs()? It wouldn't be necessary to
support alterConfigs() for all broker configs, just DynamicConfigs (which
can't be set via the config file any way). Would it be a problem that
triggering the reassignment required ClusterAction on the Cluster, but
throttling the assignment required Alter on the Topic? What if a user had
the former permission, but not the latter?

2. Is reassignPartitions() really the best name? I find the distinction
between reassigning and just assigning to be of limited value, and
"reassign" is misleading when the APIs now used for changing the
replication factor too. Since the API is asynchronous/long running, it
might be better to indicate that in the name some how. What about
startPartitionAssignment()?

I am, of course, interested in any other questions or comments people have.




On 30 August 2017 at 16:17, Tom Bentley <t....@gmail.com> wrote:

> Hi all,
>
> I've updated the KIP as follows:
>
> * remove the APIs supporting progress reporting in favour of the APIs
> being implemented in KIP-113.
> * added some APIs to cover the existing functionality around throttling
> inter-broker transfers, which was previously a TODO.
>
> To respond to Colin's suggestion:
>
> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> >                                   Map<String, TopicAlteration> alters)
> >
> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> >
> > ReplicationFactorAlteration(int repl) implements TopicAlteration
> >
> > ReassignPartitionsAlteration(...) implements TopicAlteration
>
> That is a workable alternative to providing 3 separate methods on the
> AdminClient, but I don't see why it is objectively better. I don't see
> clients commonly needing to do a mixture of alterations, and it assumes
> that the options make sense for all alterations.
>
> Cheers,
>
> Tom
>
> On 22 August 2017 at 23:49, Colin McCabe <cm...@apache.org> wrote:
>
>> On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
>> > Hi Dong and Jun,
>> >
>> > Thanks again for your input in this discussion and on KIP-113. It's
>> > difficult that discussion is split between this thread and the one for
>> > KIP-113, but I'll try to respond on this thread to questions asked on
>> > this
>> > thread.
>> >
>> > It seems there is some consensus that the alterTopic() API is the wrong
>> > thing, and it would make more sense to separate the different kinds of
>> > alteration into separate APIs. It seems to me there is then a choice.
>> >
>> > 1. Have separate alterPartitionCount(), alterReplicationFactor() and
>> > reassignPartitions() methods. This would most closely match the
>> > facilities
>> > currently offered by kafka-alter-topics and kafka-reassign-partitions.
>> > 2. Just provide a reassignPartitions() method and infer from the shape
>> of
>> > the data passed to that that a change in replication factor and/or
>> > partition count is required, as suggested by Dong.
>> >
>> > The choice we make here is also relevant to KIP-113 and KIP-178. By
>> > choosing (1) we can change the replication factor or partition count
>> > without providing an assignment and therefore are necessarily requiring
>> > the
>> > controller to make a decision for us about which broker (and, for 113,
>> > which log directory and thus which disk) the replica should be reside.
>> > There is then the matter of what criteria the controller should use to
>> > make
>> > that decision (a decision is also required on topic creation of course,
>> > but
>> > I'll try not to go there right now).
>>
>> Hmm.  One approach would be to have something like
>>
>> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
>> >                                   Map<String, TopicAlteration> alters)
>> >
>> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
>> >
>> > ReplicationFactorAlteration(int repl) implements TopicAlteration
>> >
>> > ReassignPartitionsAlteration(...) implements TopicAlteration
>>
>>
>> >
>> > Choosing (2), on the other hand, forces us to make an assignment, and
>> > currently the AdminClient doesn't provide the APIs necessary to make a
>> > very
>> > informed decision. To do the job properly we'd need APIs to enumerate
>> the
>> > permissible log directories on each broker, know the current disk usage
>> > etc. These just don't exist today, and I think it would be a lot of work
>> > to
>> > bite off to specify them. We've already got three KIPs on the go.
>> >
>> > So, for the moment, I think we have to choose 1, for pragmatic reasons.
>> > There is nothing to stop us deprecating alterPartitionCount() and
>> > alterReplicationFactor() and moving to (2) at a later date.
>>
>> Yeah, the API should be pragmatic.  An API that doesn't let us do what
>> we want to do (like let Kafka assign us a new replica on the least
>> loaded node) is not good.
>>
>> >
>> > To answer a specific questions of Jun's:
>> >
>> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
>> > > means.
>> >
>> >
>> > It corresponds to the ReplicaStatus.statusTime, and the idea was it was
>> > the
>> > epoch offset at which the controllers notion of the lag was current
>> > (which
>> > I think is the epoch offset of the last FetchRequest from the follower).
>> > At
>> > one point I was considering some other ways of determining progress and
>> > completion, and TBH I think it was more pertinent to those rejected
>> > alternatives.
>> >
>> > I've already made some changes to KIP-179. As suggested in the thread
>> for
>> > KIP-113, I will be changing some more since I think we can
>> > DescribeDirsRequest
>> > instead of ReplicaStatusResponse to implement the --progress option.
>>
>> That makes sense.  Whatever response is used, it would be nice to have
>> an error message in addition to an error code.
>>
>> best,
>> Colin
>>
>> >
>> > Cheers,
>> >
>> > Tom
>> >
>> >
>> >
>> > On 9 August 2017 at 02:28, Jun Rao <ju...@confluent.io> wrote:
>> >
>> > > Hi, Tom,
>> > >
>> > > Thanks for the KIP. A few minor comments below.
>> > >
>> > > 1. Implementation wise, the broker handles adding partitions
>> differently
>> > > from changing replica assignment. For the former, we directly update
>> the
>> > > topic path in ZK with the new partitions. For the latter, we write
>> the new
>> > > partition reassignment in the partition reassignment path. Changing
>> the
>> > > replication factor is handled in the same way as changing replica
>> > > assignment. So, it would be useful to document how the broker handles
>> these
>> > > different cases accordingly. I think it's simpler to just allow one
>> change
>> > > (partition, replication factor, or replica assignment) in a request.
>> > >
>> > > 2. Currently, we only allow adding partitions. We probably want to
>> document
>> > > the restriction in the api.
>> > >
>> > > 3. It's not very clear to me what status_time in ReplicaStatusResponse
>> > > means.
>> > >
>> > > Jun
>> > >
>> > >
>> > >
>> > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <li...@gmail.com>
>> wrote:
>> > >
>> > > > Hey Tom,
>> > > >
>> > > > Thanks for your reply. Here are my thoughts:
>> > > >
>> > > > 1) I think the DescribeDirsResponse can be used by AdminClient to
>> query
>> > > the
>> > > > lag of follower replica as well. Here is how it works:
>> > > >
>> > > > - AdminClient sends DescribeDirsRequest to both the leader and the
>> > > follower
>> > > > of the partition.
>> > > > - DescribeDirsResponse from both leader and follower shows the LEO
>> of the
>> > > > leader replica and the follower replica.
>> > > > - Lag of the follower replica is the difference in the LEO between
>> leader
>> > > > and follower.
>> > > >
>> > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to
>> be
>> > > send
>> > > > to each replica of the partition whereas ReplicaStatusRequest only
>> needs
>> > > to
>> > > > be sent to the leader of the partition. It doesn't seem to make much
>> > > > difference though. In practice we probably want to query the
>> replica lag
>> > > of
>> > > > many partitions and AminClient needs to send exactly one request to
>> each
>> > > > broker with either solution. Does this make sense?
>> > > >
>> > > >
>> > > > 2) KIP-179 proposes to add the following AdminClient API:
>> > > >
>> > > > alterTopics(Collection<AlteredTopic> alteredTopics,
>> AlterTopicsOptions
>> > > > options)
>> > > >
>> > > > Where AlteredTopic includes the following fields
>> > > >
>> > > > AlteredTopic(String name, int numPartitions, int replicationFactor,
>> > > > Map<Integer,List<Integer>> replicasAssignment)
>> > > >
>> > > > I have two comments on this:
>> > > >
>> > > > - The information in AlteredTopic seems a bit redundant. Both
>> > > numPartitions
>> > > > and replicationFactor can be derived from replicasAssignment. I
>> think we
>> > > > can probably just use the following API in AdminClient instead:
>> > > >
>> > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
>> > > > partitionAssignment, AlterTopicsOptions options)
>> > > >
>> > > > - Do you think "reassignPartitions" may be a better name than
>> > > > "alterTopics"? This is more consistent with the existing name used
>> in
>> > > > kafka-reassign-partitions.sh and I also find it to be more
>> accurate. On
>> > > the
>> > > > other hand, alterTopics seems to suggest that we can also alter
>> topic
>> > > > config.
>> > > >
>> > > >
>> > > >
>> > > > Thanks,
>> > > > Dong
>> > > >
>> > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t....@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi Dong,
>> > > > >
>> > > > > Thanks for your reply.
>> > > > >
>> > > > > You're right that your DescribeDirsResponse contains appropriate
>> data.
>> > > > The
>> > > > > comment about the log_end_offset in the KIP says "Enable user to
>> track
>> > > > > movement progress by comparing LEO of the *.log and *.move". That
>> makes
>> > > > me
>> > > > > wonder whether this would only work for replica movement on the
>> same
>> > > > > broker. In the case of reassigning partitions between brokers
>> it's only
>> > > > > really the leader for the partition that knows the max LEO of the
>> ISR
>> > > and
>> > > > > the LEO of the target broker. Maybe the comment is misleading and
>> it
>> > > > would
>> > > > > be the leader doing the same thing in both cases. Can you clarify
>> this?
>> > > >
>> > > >
>> > > > > At a conceptual level there's a lot of similarity between KIP-113
>> and
>> > > > > KIP-179 because they're both about moving replicas around.
>> Concretely
>> > > > > though, ChangeReplicaDirRequest assumes the movement is on the
>> same
>> > > > broker,
>> > > > > while the equivalent APIs in KIP-179 (whichever alternative) lack
>> the
>> > > > > notion of disks. I wonder if a unified API would be better or not.
>> > > > > Operationally, the reasons for changing replicas between brokers
>> are
>> > > > likely
>> > > > > to be motivated by balancing load across the cluster, or
>> > > adding/removing
>> > > > > brokers. But the JBOD use case has different motivations. So I
>> suspect
>> > > > it's
>> > > > > OK that the APIs are separate, but I'd love to hear what others
>> think.
>> > > >
>> > > >
>> > > > > I think you're right about TopicPartitionReplica. At the protocol
>> level
>> > > > we
>> > > > > could group topics and partitions together so avoid having the
>> same
>> > > topic
>> > > > > name multiple times when querying for the status of all the
>> partitions
>> > > > of a
>> > > > > topic.
>> > > > >
>> > > > > Thanks again for taking the time to respond.
>> > > > >
>> > > > > Tom
>> > > > >
>> > > > > On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com> wrote:
>> > > > >
>> > > > > > Hey Tom,
>> > > > > >
>> > > > > > Thanks for the KIP. It seems that the prior discussion in this
>> thread
>> > > > has
>> > > > > > focused on reassigning partitions (or AlterTopics). I haven't
>> looked
>> > > > into
>> > > > > > this yet. I have two comments on the replicaStatus() API and the
>> > > > > > ReplicaStatusRequest:
>> > > > > >
>> > > > > > -  It seems that the use-case for ReplicaStatusRequest is
>> covered by
>> > > > > > the DescribeDirsRequest introduced in KIP-113
>> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > 113%3A+Support+replicas+movement+between+log+directories>.
>> > > > > > DescribeDirsResponse contains the logEndOffset for the specified
>> > > > > > partitions. The the lag of the follower replica can be derived
>> as the
>> > > > > > difference between leader's LEO and follower's LEO. Do you
>> think we
>> > > can
>> > > > > > simply use DescribeDirsRequest without introducing
>> > > > ReplicaStatusRequest?
>> > > > > >
>> > > > > > - According to the current KIP, replicaStatus() takes a list of
>> > > > > partitions
>> > > > > > as input. And its output maps the partition to a list of replica
>> > > > status.
>> > > > > Do
>> > > > > > you think it would be better to have a new class called
>> > > > > > TopicPartitionReplica, which identifies the topic, partition
>> and the
>> > > > > > brokerId. replicaStatus can then take a list of
>> TopicPartitionReplica
>> > > > as
>> > > > > > input. And its output maps the replica to replica status. The
>> latter
>> > > > API
>> > > > > > seems simpler and also matches the method name better. What do
>> you
>> > > > think?
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Dong
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <
>> t.j.bentley@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi again Ismael,
>> > > > > > >
>> > > > > > >
>> > > > > > > 1. It's worth emphasising that reassigning partitions is a
>> > > different
>> > > > > > > >> process than what happens when a topic is created, so not
>> sure
>> > > > > trying
>> > > > > > to
>> > > > > > > >> make it symmetric is beneficial. In addition to what was
>> already
>> > > > > > > >> discussed,
>> > > > > > > >> one should also enable replication throttling before
>> moving the
>> > > > > data.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Thanks for your responses.The only advantage I can see from
>> > > having
>> > > > > > > > separate APIs for partition count change vs. the other
>> changes is
>> > > > > that
>> > > > > > we
>> > > > > > > > expect the former to return synchronously, and the latter to
>> > > > > (usually)
>> > > > > > > > return asynchronously. Lumping them into the same API forces
>> > > > clients
>> > > > > of
>> > > > > > > > that API to code for the two possible cases. Is this the
>> > > particular
>> > > > > > > concern
>> > > > > > > > you had in mind, or do you have others?
>> > > > > > > >
>> > > > > > >
>> > > > > > > Just to help see what this looks like I've created another
>> version
>> > > of
>> > > > > > > KIP-179 (
>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
>> > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+
>> AdminClient
>> > > > > > > ).
>> > > > > > >
>> > > > > > > The AdminClient APIs are more type safe (harder to misuse).
>> The
>> > > > > > > asynchronous nature of the alterReplicationFactors() and
>> > > > > > > reassignPartitions() methods aren't really apparent (you'd
>> still
>> > > have
>> > > > > to
>> > > > > > > read the javadocs to know you had to call replicaStatus() to
>> > > > determine
>> > > > > > > completion), but it's still better that the lumped-together
>> API
>> > > > because
>> > > > > > the
>> > > > > > > methods are _consistently_ async. The Protocol APIs are
>> slightly
>> > > > nicer
>> > > > > > too,
>> > > > > > > in that INVALID_REQUEST is less commonly returned. Another
>> benefit
>> > > is
>> > > > > it
>> > > > > > > would allow for these different alteration APIs to be modified
>> > > > > > > independently in the future, should that necessary.
>> > > > > > >
>> > > > > > > What do others think?
>> > > > > > >
>> > > > > > > You're right that the proposal doesn't cover setting and
>> clearing a
>> > > > > > > > throttle, and it should. I will study the code about how
>> this
>> > > works
>> > > > > > > before
>> > > > > > > > posting back about that.
>> > > > > > > >
>> > > > > > >
>> > > > > > > AFAICS from the code the throttle is simply a dynamic config
>> of the
>> > > > > > broker.
>> > > > > > > We already have the AdminClient.alterConfigs API for setting
>> this,
>> > > so
>> > > > > the
>> > > > > > > refactoring of kafka-reassign-partitions.sh could just use
>> that
>> > > > > existing
>> > > > > > > API. We could still achieve your objective of setting the
>> throttle
>> > > > > before
>> > > > > > > reassignment by making the --throttle option mandatory when
>> > > --execute
>> > > > > was
>> > > > > > > present. Or did you have something else in mind?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > >
>> > > > > > > Tom
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>>
>
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi all,

I've updated the KIP as follows:

* remove the APIs supporting progress reporting in favour of the APIs being
implemented in KIP-113.
* added some APIs to cover the existing functionality around throttling
inter-broker transfers, which was previously a TODO.

To respond to Colin's suggestion:

> TopicAlterationResult alterTopics(TopicAlterationOptions options,
>                                   Map<String, TopicAlteration> alters)
>
> PartitionCountAlteration(int numPartitions) implements TopicAlteration
>
> ReplicationFactorAlteration(int repl) implements TopicAlteration
>
> ReassignPartitionsAlteration(...) implements TopicAlteration

That is a workable alternative to providing 3 separate methods on the
AdminClient, but I don't see why it is objectively better. I don't see
clients commonly needing to do a mixture of alterations, and it assumes
that the options make sense for all alterations.

Cheers,

Tom

On 22 August 2017 at 23:49, Colin McCabe <cm...@apache.org> wrote:

> On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
> > Hi Dong and Jun,
> >
> > Thanks again for your input in this discussion and on KIP-113. It's
> > difficult that discussion is split between this thread and the one for
> > KIP-113, but I'll try to respond on this thread to questions asked on
> > this
> > thread.
> >
> > It seems there is some consensus that the alterTopic() API is the wrong
> > thing, and it would make more sense to separate the different kinds of
> > alteration into separate APIs. It seems to me there is then a choice.
> >
> > 1. Have separate alterPartitionCount(), alterReplicationFactor() and
> > reassignPartitions() methods. This would most closely match the
> > facilities
> > currently offered by kafka-alter-topics and kafka-reassign-partitions.
> > 2. Just provide a reassignPartitions() method and infer from the shape of
> > the data passed to that that a change in replication factor and/or
> > partition count is required, as suggested by Dong.
> >
> > The choice we make here is also relevant to KIP-113 and KIP-178. By
> > choosing (1) we can change the replication factor or partition count
> > without providing an assignment and therefore are necessarily requiring
> > the
> > controller to make a decision for us about which broker (and, for 113,
> > which log directory and thus which disk) the replica should be reside.
> > There is then the matter of what criteria the controller should use to
> > make
> > that decision (a decision is also required on topic creation of course,
> > but
> > I'll try not to go there right now).
>
> Hmm.  One approach would be to have something like
>
> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> >                                   Map<String, TopicAlteration> alters)
> >
> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> >
> > ReplicationFactorAlteration(int repl) implements TopicAlteration
> >
> > ReassignPartitionsAlteration(...) implements TopicAlteration
>
>
> >
> > Choosing (2), on the other hand, forces us to make an assignment, and
> > currently the AdminClient doesn't provide the APIs necessary to make a
> > very
> > informed decision. To do the job properly we'd need APIs to enumerate the
> > permissible log directories on each broker, know the current disk usage
> > etc. These just don't exist today, and I think it would be a lot of work
> > to
> > bite off to specify them. We've already got three KIPs on the go.
> >
> > So, for the moment, I think we have to choose 1, for pragmatic reasons.
> > There is nothing to stop us deprecating alterPartitionCount() and
> > alterReplicationFactor() and moving to (2) at a later date.
>
> Yeah, the API should be pragmatic.  An API that doesn't let us do what
> we want to do (like let Kafka assign us a new replica on the least
> loaded node) is not good.
>
> >
> > To answer a specific questions of Jun's:
> >
> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> > > means.
> >
> >
> > It corresponds to the ReplicaStatus.statusTime, and the idea was it was
> > the
> > epoch offset at which the controllers notion of the lag was current
> > (which
> > I think is the epoch offset of the last FetchRequest from the follower).
> > At
> > one point I was considering some other ways of determining progress and
> > completion, and TBH I think it was more pertinent to those rejected
> > alternatives.
> >
> > I've already made some changes to KIP-179. As suggested in the thread for
> > KIP-113, I will be changing some more since I think we can
> > DescribeDirsRequest
> > instead of ReplicaStatusResponse to implement the --progress option.
>
> That makes sense.  Whatever response is used, it would be nice to have
> an error message in addition to an error code.
>
> best,
> Colin
>
> >
> > Cheers,
> >
> > Tom
> >
> >
> >
> > On 9 August 2017 at 02:28, Jun Rao <ju...@confluent.io> wrote:
> >
> > > Hi, Tom,
> > >
> > > Thanks for the KIP. A few minor comments below.
> > >
> > > 1. Implementation wise, the broker handles adding partitions
> differently
> > > from changing replica assignment. For the former, we directly update
> the
> > > topic path in ZK with the new partitions. For the latter, we write the
> new
> > > partition reassignment in the partition reassignment path. Changing the
> > > replication factor is handled in the same way as changing replica
> > > assignment. So, it would be useful to document how the broker handles
> these
> > > different cases accordingly. I think it's simpler to just allow one
> change
> > > (partition, replication factor, or replica assignment) in a request.
> > >
> > > 2. Currently, we only allow adding partitions. We probably want to
> document
> > > the restriction in the api.
> > >
> > > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> > > means.
> > >
> > > Jun
> > >
> > >
> > >
> > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <li...@gmail.com> wrote:
> > >
> > > > Hey Tom,
> > > >
> > > > Thanks for your reply. Here are my thoughts:
> > > >
> > > > 1) I think the DescribeDirsResponse can be used by AdminClient to
> query
> > > the
> > > > lag of follower replica as well. Here is how it works:
> > > >
> > > > - AdminClient sends DescribeDirsRequest to both the leader and the
> > > follower
> > > > of the partition.
> > > > - DescribeDirsResponse from both leader and follower shows the LEO
> of the
> > > > leader replica and the follower replica.
> > > > - Lag of the follower replica is the difference in the LEO between
> leader
> > > > and follower.
> > > >
> > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to
> be
> > > send
> > > > to each replica of the partition whereas ReplicaStatusRequest only
> needs
> > > to
> > > > be sent to the leader of the partition. It doesn't seem to make much
> > > > difference though. In practice we probably want to query the replica
> lag
> > > of
> > > > many partitions and AminClient needs to send exactly one request to
> each
> > > > broker with either solution. Does this make sense?
> > > >
> > > >
> > > > 2) KIP-179 proposes to add the following AdminClient API:
> > > >
> > > > alterTopics(Collection<AlteredTopic> alteredTopics,
> AlterTopicsOptions
> > > > options)
> > > >
> > > > Where AlteredTopic includes the following fields
> > > >
> > > > AlteredTopic(String name, int numPartitions, int replicationFactor,
> > > > Map<Integer,List<Integer>> replicasAssignment)
> > > >
> > > > I have two comments on this:
> > > >
> > > > - The information in AlteredTopic seems a bit redundant. Both
> > > numPartitions
> > > > and replicationFactor can be derived from replicasAssignment. I
> think we
> > > > can probably just use the following API in AdminClient instead:
> > > >
> > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> > > > partitionAssignment, AlterTopicsOptions options)
> > > >
> > > > - Do you think "reassignPartitions" may be a better name than
> > > > "alterTopics"? This is more consistent with the existing name used in
> > > > kafka-reassign-partitions.sh and I also find it to be more accurate.
> On
> > > the
> > > > other hand, alterTopics seems to suggest that we can also alter topic
> > > > config.
> > > >
> > > >
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t....@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Thanks for your reply.
> > > > >
> > > > > You're right that your DescribeDirsResponse contains appropriate
> data.
> > > > The
> > > > > comment about the log_end_offset in the KIP says "Enable user to
> track
> > > > > movement progress by comparing LEO of the *.log and *.move". That
> makes
> > > > me
> > > > > wonder whether this would only work for replica movement on the
> same
> > > > > broker. In the case of reassigning partitions between brokers it's
> only
> > > > > really the leader for the partition that knows the max LEO of the
> ISR
> > > and
> > > > > the LEO of the target broker. Maybe the comment is misleading and
> it
> > > > would
> > > > > be the leader doing the same thing in both cases. Can you clarify
> this?
> > > >
> > > >
> > > > > At a conceptual level there's a lot of similarity between KIP-113
> and
> > > > > KIP-179 because they're both about moving replicas around.
> Concretely
> > > > > though, ChangeReplicaDirRequest assumes the movement is on the same
> > > > broker,
> > > > > while the equivalent APIs in KIP-179 (whichever alternative) lack
> the
> > > > > notion of disks. I wonder if a unified API would be better or not.
> > > > > Operationally, the reasons for changing replicas between brokers
> are
> > > > likely
> > > > > to be motivated by balancing load across the cluster, or
> > > adding/removing
> > > > > brokers. But the JBOD use case has different motivations. So I
> suspect
> > > > it's
> > > > > OK that the APIs are separate, but I'd love to hear what others
> think.
> > > >
> > > >
> > > > > I think you're right about TopicPartitionReplica. At the protocol
> level
> > > > we
> > > > > could group topics and partitions together so avoid having the same
> > > topic
> > > > > name multiple times when querying for the status of all the
> partitions
> > > > of a
> > > > > topic.
> > > > >
> > > > > Thanks again for taking the time to respond.
> > > > >
> > > > > Tom
> > > > >
> > > > > On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com> wrote:
> > > > >
> > > > > > Hey Tom,
> > > > > >
> > > > > > Thanks for the KIP. It seems that the prior discussion in this
> thread
> > > > has
> > > > > > focused on reassigning partitions (or AlterTopics). I haven't
> looked
> > > > into
> > > > > > this yet. I have two comments on the replicaStatus() API and the
> > > > > > ReplicaStatusRequest:
> > > > > >
> > > > > > -  It seems that the use-case for ReplicaStatusRequest is
> covered by
> > > > > > the DescribeDirsRequest introduced in KIP-113
> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 113%3A+Support+replicas+movement+between+log+directories>.
> > > > > > DescribeDirsResponse contains the logEndOffset for the specified
> > > > > > partitions. The the lag of the follower replica can be derived
> as the
> > > > > > difference between leader's LEO and follower's LEO. Do you think
> we
> > > can
> > > > > > simply use DescribeDirsRequest without introducing
> > > > ReplicaStatusRequest?
> > > > > >
> > > > > > - According to the current KIP, replicaStatus() takes a list of
> > > > > partitions
> > > > > > as input. And its output maps the partition to a list of replica
> > > > status.
> > > > > Do
> > > > > > you think it would be better to have a new class called
> > > > > > TopicPartitionReplica, which identifies the topic, partition and
> the
> > > > > > brokerId. replicaStatus can then take a list of
> TopicPartitionReplica
> > > > as
> > > > > > input. And its output maps the replica to replica status. The
> latter
> > > > API
> > > > > > seems simpler and also matches the method name better. What do
> you
> > > > think?
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <
> t.j.bentley@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi again Ismael,
> > > > > > >
> > > > > > >
> > > > > > > 1. It's worth emphasising that reassigning partitions is a
> > > different
> > > > > > > >> process than what happens when a topic is created, so not
> sure
> > > > > trying
> > > > > > to
> > > > > > > >> make it symmetric is beneficial. In addition to what was
> already
> > > > > > > >> discussed,
> > > > > > > >> one should also enable replication throttling before moving
> the
> > > > > data.
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks for your responses.The only advantage I can see from
> > > having
> > > > > > > > separate APIs for partition count change vs. the other
> changes is
> > > > > that
> > > > > > we
> > > > > > > > expect the former to return synchronously, and the latter to
> > > > > (usually)
> > > > > > > > return asynchronously. Lumping them into the same API forces
> > > > clients
> > > > > of
> > > > > > > > that API to code for the two possible cases. Is this the
> > > particular
> > > > > > > concern
> > > > > > > > you had in mind, or do you have others?
> > > > > > > >
> > > > > > >
> > > > > > > Just to help see what this looks like I've created another
> version
> > > of
> > > > > > > KIP-179 (
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+
> use+AdminClient
> > > > > > > ).
> > > > > > >
> > > > > > > The AdminClient APIs are more type safe (harder to misuse). The
> > > > > > > asynchronous nature of the alterReplicationFactors() and
> > > > > > > reassignPartitions() methods aren't really apparent (you'd
> still
> > > have
> > > > > to
> > > > > > > read the javadocs to know you had to call replicaStatus() to
> > > > determine
> > > > > > > completion), but it's still better that the lumped-together API
> > > > because
> > > > > > the
> > > > > > > methods are _consistently_ async. The Protocol APIs are
> slightly
> > > > nicer
> > > > > > too,
> > > > > > > in that INVALID_REQUEST is less commonly returned. Another
> benefit
> > > is
> > > > > it
> > > > > > > would allow for these different alteration APIs to be modified
> > > > > > > independently in the future, should that necessary.
> > > > > > >
> > > > > > > What do others think?
> > > > > > >
> > > > > > > You're right that the proposal doesn't cover setting and
> clearing a
> > > > > > > > throttle, and it should. I will study the code about how this
> > > works
> > > > > > > before
> > > > > > > > posting back about that.
> > > > > > > >
> > > > > > >
> > > > > > > AFAICS from the code the throttle is simply a dynamic config
> of the
> > > > > > broker.
> > > > > > > We already have the AdminClient.alterConfigs API for setting
> this,
> > > so
> > > > > the
> > > > > > > refactoring of kafka-reassign-partitions.sh could just use that
> > > > > existing
> > > > > > > API. We could still achieve your objective of setting the
> throttle
> > > > > before
> > > > > > > reassignment by making the --throttle option mandatory when
> > > --execute
> > > > > was
> > > > > > > present. Or did you have something else in mind?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Tom
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Colin McCabe <cm...@apache.org>.
On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
> Hi Dong and Jun,
> 
> Thanks again for your input in this discussion and on KIP-113. It's
> difficult that discussion is split between this thread and the one for
> KIP-113, but I'll try to respond on this thread to questions asked on
> this
> thread.
> 
> It seems there is some consensus that the alterTopic() API is the wrong
> thing, and it would make more sense to separate the different kinds of
> alteration into separate APIs. It seems to me there is then a choice.
> 
> 1. Have separate alterPartitionCount(), alterReplicationFactor() and
> reassignPartitions() methods. This would most closely match the
> facilities
> currently offered by kafka-alter-topics and kafka-reassign-partitions.
> 2. Just provide a reassignPartitions() method and infer from the shape of
> the data passed to that that a change in replication factor and/or
> partition count is required, as suggested by Dong.
> 
> The choice we make here is also relevant to KIP-113 and KIP-178. By
> choosing (1) we can change the replication factor or partition count
> without providing an assignment and therefore are necessarily requiring
> the
> controller to make a decision for us about which broker (and, for 113,
> which log directory and thus which disk) the replica should be reside.
> There is then the matter of what criteria the controller should use to
> make
> that decision (a decision is also required on topic creation of course,
> but
> I'll try not to go there right now).

Hmm.  One approach would be to have something like

> TopicAlterationResult alterTopics(TopicAlterationOptions options,
>                                   Map<String, TopicAlteration> alters)
>
> PartitionCountAlteration(int numPartitions) implements TopicAlteration
>
> ReplicationFactorAlteration(int repl) implements TopicAlteration
>
> ReassignPartitionsAlteration(...) implements TopicAlteration


> 
> Choosing (2), on the other hand, forces us to make an assignment, and
> currently the AdminClient doesn't provide the APIs necessary to make a
> very
> informed decision. To do the job properly we'd need APIs to enumerate the
> permissible log directories on each broker, know the current disk usage
> etc. These just don't exist today, and I think it would be a lot of work
> to
> bite off to specify them. We've already got three KIPs on the go.
> 
> So, for the moment, I think we have to choose 1, for pragmatic reasons.
> There is nothing to stop us deprecating alterPartitionCount() and
> alterReplicationFactor() and moving to (2) at a later date.

Yeah, the API should be pragmatic.  An API that doesn't let us do what
we want to do (like let Kafka assign us a new replica on the least
loaded node) is not good.

> 
> To answer a specific questions of Jun's:
> 
> 3. It's not very clear to me what status_time in ReplicaStatusResponse
> > means.
> 
> 
> It corresponds to the ReplicaStatus.statusTime, and the idea was it was
> the
> epoch offset at which the controllers notion of the lag was current
> (which
> I think is the epoch offset of the last FetchRequest from the follower).
> At
> one point I was considering some other ways of determining progress and
> completion, and TBH I think it was more pertinent to those rejected
> alternatives.
> 
> I've already made some changes to KIP-179. As suggested in the thread for
> KIP-113, I will be changing some more since I think we can
> DescribeDirsRequest
> instead of ReplicaStatusResponse to implement the --progress option.

That makes sense.  Whatever response is used, it would be nice to have
an error message in addition to an error code.

best,
Colin

> 
> Cheers,
> 
> Tom
> 
> 
> 
> On 9 August 2017 at 02:28, Jun Rao <ju...@confluent.io> wrote:
> 
> > Hi, Tom,
> >
> > Thanks for the KIP. A few minor comments below.
> >
> > 1. Implementation wise, the broker handles adding partitions differently
> > from changing replica assignment. For the former, we directly update the
> > topic path in ZK with the new partitions. For the latter, we write the new
> > partition reassignment in the partition reassignment path. Changing the
> > replication factor is handled in the same way as changing replica
> > assignment. So, it would be useful to document how the broker handles these
> > different cases accordingly. I think it's simpler to just allow one change
> > (partition, replication factor, or replica assignment) in a request.
> >
> > 2. Currently, we only allow adding partitions. We probably want to document
> > the restriction in the api.
> >
> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> > means.
> >
> > Jun
> >
> >
> >
> > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <li...@gmail.com> wrote:
> >
> > > Hey Tom,
> > >
> > > Thanks for your reply. Here are my thoughts:
> > >
> > > 1) I think the DescribeDirsResponse can be used by AdminClient to query
> > the
> > > lag of follower replica as well. Here is how it works:
> > >
> > > - AdminClient sends DescribeDirsRequest to both the leader and the
> > follower
> > > of the partition.
> > > - DescribeDirsResponse from both leader and follower shows the LEO of the
> > > leader replica and the follower replica.
> > > - Lag of the follower replica is the difference in the LEO between leader
> > > and follower.
> > >
> > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to be
> > send
> > > to each replica of the partition whereas ReplicaStatusRequest only needs
> > to
> > > be sent to the leader of the partition. It doesn't seem to make much
> > > difference though. In practice we probably want to query the replica lag
> > of
> > > many partitions and AminClient needs to send exactly one request to each
> > > broker with either solution. Does this make sense?
> > >
> > >
> > > 2) KIP-179 proposes to add the following AdminClient API:
> > >
> > > alterTopics(Collection<AlteredTopic> alteredTopics, AlterTopicsOptions
> > > options)
> > >
> > > Where AlteredTopic includes the following fields
> > >
> > > AlteredTopic(String name, int numPartitions, int replicationFactor,
> > > Map<Integer,List<Integer>> replicasAssignment)
> > >
> > > I have two comments on this:
> > >
> > > - The information in AlteredTopic seems a bit redundant. Both
> > numPartitions
> > > and replicationFactor can be derived from replicasAssignment. I think we
> > > can probably just use the following API in AdminClient instead:
> > >
> > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> > > partitionAssignment, AlterTopicsOptions options)
> > >
> > > - Do you think "reassignPartitions" may be a better name than
> > > "alterTopics"? This is more consistent with the existing name used in
> > > kafka-reassign-partitions.sh and I also find it to be more accurate. On
> > the
> > > other hand, alterTopics seems to suggest that we can also alter topic
> > > config.
> > >
> > >
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t....@gmail.com>
> > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for your reply.
> > > >
> > > > You're right that your DescribeDirsResponse contains appropriate data.
> > > The
> > > > comment about the log_end_offset in the KIP says "Enable user to track
> > > > movement progress by comparing LEO of the *.log and *.move". That makes
> > > me
> > > > wonder whether this would only work for replica movement on the same
> > > > broker. In the case of reassigning partitions between brokers it's only
> > > > really the leader for the partition that knows the max LEO of the ISR
> > and
> > > > the LEO of the target broker. Maybe the comment is misleading and it
> > > would
> > > > be the leader doing the same thing in both cases. Can you clarify this?
> > >
> > >
> > > > At a conceptual level there's a lot of similarity between KIP-113 and
> > > > KIP-179 because they're both about moving replicas around. Concretely
> > > > though, ChangeReplicaDirRequest assumes the movement is on the same
> > > broker,
> > > > while the equivalent APIs in KIP-179 (whichever alternative) lack the
> > > > notion of disks. I wonder if a unified API would be better or not.
> > > > Operationally, the reasons for changing replicas between brokers are
> > > likely
> > > > to be motivated by balancing load across the cluster, or
> > adding/removing
> > > > brokers. But the JBOD use case has different motivations. So I suspect
> > > it's
> > > > OK that the APIs are separate, but I'd love to hear what others think.
> > >
> > >
> > > > I think you're right about TopicPartitionReplica. At the protocol level
> > > we
> > > > could group topics and partitions together so avoid having the same
> > topic
> > > > name multiple times when querying for the status of all the partitions
> > > of a
> > > > topic.
> > > >
> > > > Thanks again for taking the time to respond.
> > > >
> > > > Tom
> > > >
> > > > On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com> wrote:
> > > >
> > > > > Hey Tom,
> > > > >
> > > > > Thanks for the KIP. It seems that the prior discussion in this thread
> > > has
> > > > > focused on reassigning partitions (or AlterTopics). I haven't looked
> > > into
> > > > > this yet. I have two comments on the replicaStatus() API and the
> > > > > ReplicaStatusRequest:
> > > > >
> > > > > -  It seems that the use-case for ReplicaStatusRequest is covered by
> > > > > the DescribeDirsRequest introduced in KIP-113
> > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 113%3A+Support+replicas+movement+between+log+directories>.
> > > > > DescribeDirsResponse contains the logEndOffset for the specified
> > > > > partitions. The the lag of the follower replica can be derived as the
> > > > > difference between leader's LEO and follower's LEO. Do you think we
> > can
> > > > > simply use DescribeDirsRequest without introducing
> > > ReplicaStatusRequest?
> > > > >
> > > > > - According to the current KIP, replicaStatus() takes a list of
> > > > partitions
> > > > > as input. And its output maps the partition to a list of replica
> > > status.
> > > > Do
> > > > > you think it would be better to have a new class called
> > > > > TopicPartitionReplica, which identifies the topic, partition and the
> > > > > brokerId. replicaStatus can then take a list of TopicPartitionReplica
> > > as
> > > > > input. And its output maps the replica to replica status. The latter
> > > API
> > > > > seems simpler and also matches the method name better. What do you
> > > think?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <t....@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi again Ismael,
> > > > > >
> > > > > >
> > > > > > 1. It's worth emphasising that reassigning partitions is a
> > different
> > > > > > >> process than what happens when a topic is created, so not sure
> > > > trying
> > > > > to
> > > > > > >> make it symmetric is beneficial. In addition to what was already
> > > > > > >> discussed,
> > > > > > >> one should also enable replication throttling before moving the
> > > > data.
> > > > > > >
> > > > > > >
> > > > > > > Thanks for your responses.The only advantage I can see from
> > having
> > > > > > > separate APIs for partition count change vs. the other changes is
> > > > that
> > > > > we
> > > > > > > expect the former to return synchronously, and the latter to
> > > > (usually)
> > > > > > > return asynchronously. Lumping them into the same API forces
> > > clients
> > > > of
> > > > > > > that API to code for the two possible cases. Is this the
> > particular
> > > > > > concern
> > > > > > > you had in mind, or do you have others?
> > > > > > >
> > > > > >
> > > > > > Just to help see what this looks like I've created another version
> > of
> > > > > > KIP-179 (
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient
> > > > > > ).
> > > > > >
> > > > > > The AdminClient APIs are more type safe (harder to misuse). The
> > > > > > asynchronous nature of the alterReplicationFactors() and
> > > > > > reassignPartitions() methods aren't really apparent (you'd still
> > have
> > > > to
> > > > > > read the javadocs to know you had to call replicaStatus() to
> > > determine
> > > > > > completion), but it's still better that the lumped-together API
> > > because
> > > > > the
> > > > > > methods are _consistently_ async. The Protocol APIs are slightly
> > > nicer
> > > > > too,
> > > > > > in that INVALID_REQUEST is less commonly returned. Another benefit
> > is
> > > > it
> > > > > > would allow for these different alteration APIs to be modified
> > > > > > independently in the future, should that necessary.
> > > > > >
> > > > > > What do others think?
> > > > > >
> > > > > > You're right that the proposal doesn't cover setting and clearing a
> > > > > > > throttle, and it should. I will study the code about how this
> > works
> > > > > > before
> > > > > > > posting back about that.
> > > > > > >
> > > > > >
> > > > > > AFAICS from the code the throttle is simply a dynamic config of the
> > > > > broker.
> > > > > > We already have the AdminClient.alterConfigs API for setting this,
> > so
> > > > the
> > > > > > refactoring of kafka-reassign-partitions.sh could just use that
> > > > existing
> > > > > > API. We could still achieve your objective of setting the throttle
> > > > before
> > > > > > reassignment by making the --throttle option mandatory when
> > --execute
> > > > was
> > > > > > present. Or did you have something else in mind?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Tom
> > > > > >
> > > > >
> > > >
> > >
> >

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi Dong and Jun,

Thanks again for your input in this discussion and on KIP-113. It's
difficult that discussion is split between this thread and the one for
KIP-113, but I'll try to respond on this thread to questions asked on this
thread.

It seems there is some consensus that the alterTopic() API is the wrong
thing, and it would make more sense to separate the different kinds of
alteration into separate APIs. It seems to me there is then a choice.

1. Have separate alterPartitionCount(), alterReplicationFactor() and
reassignPartitions() methods. This would most closely match the facilities
currently offered by kafka-alter-topics and kafka-reassign-partitions.
2. Just provide a reassignPartitions() method and infer from the shape of
the data passed to that that a change in replication factor and/or
partition count is required, as suggested by Dong.

The choice we make here is also relevant to KIP-113 and KIP-178. By
choosing (1) we can change the replication factor or partition count
without providing an assignment and therefore are necessarily requiring the
controller to make a decision for us about which broker (and, for 113,
which log directory and thus which disk) the replica should be reside.
There is then the matter of what criteria the controller should use to make
that decision (a decision is also required on topic creation of course, but
I'll try not to go there right now).

Choosing (2), on the other hand, forces us to make an assignment, and
currently the AdminClient doesn't provide the APIs necessary to make a very
informed decision. To do the job properly we'd need APIs to enumerate the
permissible log directories on each broker, know the current disk usage
etc. These just don't exist today, and I think it would be a lot of work to
bite off to specify them. We've already got three KIPs on the go.

So, for the moment, I think we have to choose 1, for pragmatic reasons.
There is nothing to stop us deprecating alterPartitionCount() and
alterReplicationFactor() and moving to (2) at a later date.

To answer a specific questions of Jun's:

3. It's not very clear to me what status_time in ReplicaStatusResponse
> means.


It corresponds to the ReplicaStatus.statusTime, and the idea was it was the
epoch offset at which the controllers notion of the lag was current (which
I think is the epoch offset of the last FetchRequest from the follower). At
one point I was considering some other ways of determining progress and
completion, and TBH I think it was more pertinent to those rejected
alternatives.

I've already made some changes to KIP-179. As suggested in the thread for
KIP-113, I will be changing some more since I think we can DescribeDirsRequest
instead of ReplicaStatusResponse to implement the --progress option.

Cheers,

Tom



On 9 August 2017 at 02:28, Jun Rao <ju...@confluent.io> wrote:

> Hi, Tom,
>
> Thanks for the KIP. A few minor comments below.
>
> 1. Implementation wise, the broker handles adding partitions differently
> from changing replica assignment. For the former, we directly update the
> topic path in ZK with the new partitions. For the latter, we write the new
> partition reassignment in the partition reassignment path. Changing the
> replication factor is handled in the same way as changing replica
> assignment. So, it would be useful to document how the broker handles these
> different cases accordingly. I think it's simpler to just allow one change
> (partition, replication factor, or replica assignment) in a request.
>
> 2. Currently, we only allow adding partitions. We probably want to document
> the restriction in the api.
>
> 3. It's not very clear to me what status_time in ReplicaStatusResponse
> means.
>
> Jun
>
>
>
> On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <li...@gmail.com> wrote:
>
> > Hey Tom,
> >
> > Thanks for your reply. Here are my thoughts:
> >
> > 1) I think the DescribeDirsResponse can be used by AdminClient to query
> the
> > lag of follower replica as well. Here is how it works:
> >
> > - AdminClient sends DescribeDirsRequest to both the leader and the
> follower
> > of the partition.
> > - DescribeDirsResponse from both leader and follower shows the LEO of the
> > leader replica and the follower replica.
> > - Lag of the follower replica is the difference in the LEO between leader
> > and follower.
> >
> > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to be
> send
> > to each replica of the partition whereas ReplicaStatusRequest only needs
> to
> > be sent to the leader of the partition. It doesn't seem to make much
> > difference though. In practice we probably want to query the replica lag
> of
> > many partitions and AminClient needs to send exactly one request to each
> > broker with either solution. Does this make sense?
> >
> >
> > 2) KIP-179 proposes to add the following AdminClient API:
> >
> > alterTopics(Collection<AlteredTopic> alteredTopics, AlterTopicsOptions
> > options)
> >
> > Where AlteredTopic includes the following fields
> >
> > AlteredTopic(String name, int numPartitions, int replicationFactor,
> > Map<Integer,List<Integer>> replicasAssignment)
> >
> > I have two comments on this:
> >
> > - The information in AlteredTopic seems a bit redundant. Both
> numPartitions
> > and replicationFactor can be derived from replicasAssignment. I think we
> > can probably just use the following API in AdminClient instead:
> >
> > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> > partitionAssignment, AlterTopicsOptions options)
> >
> > - Do you think "reassignPartitions" may be a better name than
> > "alterTopics"? This is more consistent with the existing name used in
> > kafka-reassign-partitions.sh and I also find it to be more accurate. On
> the
> > other hand, alterTopics seems to suggest that we can also alter topic
> > config.
> >
> >
> >
> > Thanks,
> > Dong
> >
> > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t....@gmail.com>
> wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for your reply.
> > >
> > > You're right that your DescribeDirsResponse contains appropriate data.
> > The
> > > comment about the log_end_offset in the KIP says "Enable user to track
> > > movement progress by comparing LEO of the *.log and *.move". That makes
> > me
> > > wonder whether this would only work for replica movement on the same
> > > broker. In the case of reassigning partitions between brokers it's only
> > > really the leader for the partition that knows the max LEO of the ISR
> and
> > > the LEO of the target broker. Maybe the comment is misleading and it
> > would
> > > be the leader doing the same thing in both cases. Can you clarify this?
> >
> >
> > > At a conceptual level there's a lot of similarity between KIP-113 and
> > > KIP-179 because they're both about moving replicas around. Concretely
> > > though, ChangeReplicaDirRequest assumes the movement is on the same
> > broker,
> > > while the equivalent APIs in KIP-179 (whichever alternative) lack the
> > > notion of disks. I wonder if a unified API would be better or not.
> > > Operationally, the reasons for changing replicas between brokers are
> > likely
> > > to be motivated by balancing load across the cluster, or
> adding/removing
> > > brokers. But the JBOD use case has different motivations. So I suspect
> > it's
> > > OK that the APIs are separate, but I'd love to hear what others think.
> >
> >
> > > I think you're right about TopicPartitionReplica. At the protocol level
> > we
> > > could group topics and partitions together so avoid having the same
> topic
> > > name multiple times when querying for the status of all the partitions
> > of a
> > > topic.
> > >
> > > Thanks again for taking the time to respond.
> > >
> > > Tom
> > >
> > > On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com> wrote:
> > >
> > > > Hey Tom,
> > > >
> > > > Thanks for the KIP. It seems that the prior discussion in this thread
> > has
> > > > focused on reassigning partitions (or AlterTopics). I haven't looked
> > into
> > > > this yet. I have two comments on the replicaStatus() API and the
> > > > ReplicaStatusRequest:
> > > >
> > > > -  It seems that the use-case for ReplicaStatusRequest is covered by
> > > > the DescribeDirsRequest introduced in KIP-113
> > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 113%3A+Support+replicas+movement+between+log+directories>.
> > > > DescribeDirsResponse contains the logEndOffset for the specified
> > > > partitions. The the lag of the follower replica can be derived as the
> > > > difference between leader's LEO and follower's LEO. Do you think we
> can
> > > > simply use DescribeDirsRequest without introducing
> > ReplicaStatusRequest?
> > > >
> > > > - According to the current KIP, replicaStatus() takes a list of
> > > partitions
> > > > as input. And its output maps the partition to a list of replica
> > status.
> > > Do
> > > > you think it would be better to have a new class called
> > > > TopicPartitionReplica, which identifies the topic, partition and the
> > > > brokerId. replicaStatus can then take a list of TopicPartitionReplica
> > as
> > > > input. And its output maps the replica to replica status. The latter
> > API
> > > > seems simpler and also matches the method name better. What do you
> > think?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > >
> > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <t....@gmail.com>
> > > wrote:
> > > >
> > > > > Hi again Ismael,
> > > > >
> > > > >
> > > > > 1. It's worth emphasising that reassigning partitions is a
> different
> > > > > >> process than what happens when a topic is created, so not sure
> > > trying
> > > > to
> > > > > >> make it symmetric is beneficial. In addition to what was already
> > > > > >> discussed,
> > > > > >> one should also enable replication throttling before moving the
> > > data.
> > > > > >
> > > > > >
> > > > > > Thanks for your responses.The only advantage I can see from
> having
> > > > > > separate APIs for partition count change vs. the other changes is
> > > that
> > > > we
> > > > > > expect the former to return synchronously, and the latter to
> > > (usually)
> > > > > > return asynchronously. Lumping them into the same API forces
> > clients
> > > of
> > > > > > that API to code for the two possible cases. Is this the
> particular
> > > > > concern
> > > > > > you had in mind, or do you have others?
> > > > > >
> > > > >
> > > > > Just to help see what this looks like I've created another version
> of
> > > > > KIP-179 (
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient
> > > > > ).
> > > > >
> > > > > The AdminClient APIs are more type safe (harder to misuse). The
> > > > > asynchronous nature of the alterReplicationFactors() and
> > > > > reassignPartitions() methods aren't really apparent (you'd still
> have
> > > to
> > > > > read the javadocs to know you had to call replicaStatus() to
> > determine
> > > > > completion), but it's still better that the lumped-together API
> > because
> > > > the
> > > > > methods are _consistently_ async. The Protocol APIs are slightly
> > nicer
> > > > too,
> > > > > in that INVALID_REQUEST is less commonly returned. Another benefit
> is
> > > it
> > > > > would allow for these different alteration APIs to be modified
> > > > > independently in the future, should that necessary.
> > > > >
> > > > > What do others think?
> > > > >
> > > > > You're right that the proposal doesn't cover setting and clearing a
> > > > > > throttle, and it should. I will study the code about how this
> works
> > > > > before
> > > > > > posting back about that.
> > > > > >
> > > > >
> > > > > AFAICS from the code the throttle is simply a dynamic config of the
> > > > broker.
> > > > > We already have the AdminClient.alterConfigs API for setting this,
> so
> > > the
> > > > > refactoring of kafka-reassign-partitions.sh could just use that
> > > existing
> > > > > API. We could still achieve your objective of setting the throttle
> > > before
> > > > > reassignment by making the --throttle option mandatory when
> --execute
> > > was
> > > > > present. Or did you have something else in mind?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Tom
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Jun Rao <ju...@confluent.io>.
Hi, Tom,

Thanks for the KIP. A few minor comments below.

1. Implementation wise, the broker handles adding partitions differently
from changing replica assignment. For the former, we directly update the
topic path in ZK with the new partitions. For the latter, we write the new
partition reassignment in the partition reassignment path. Changing the
replication factor is handled in the same way as changing replica
assignment. So, it would be useful to document how the broker handles these
different cases accordingly. I think it's simpler to just allow one change
(partition, replication factor, or replica assignment) in a request.

2. Currently, we only allow adding partitions. We probably want to document
the restriction in the api.

3. It's not very clear to me what status_time in ReplicaStatusResponse
means.

Jun



On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <li...@gmail.com> wrote:

> Hey Tom,
>
> Thanks for your reply. Here are my thoughts:
>
> 1) I think the DescribeDirsResponse can be used by AdminClient to query the
> lag of follower replica as well. Here is how it works:
>
> - AdminClient sends DescribeDirsRequest to both the leader and the follower
> of the partition.
> - DescribeDirsResponse from both leader and follower shows the LEO of the
> leader replica and the follower replica.
> - Lag of the follower replica is the difference in the LEO between leader
> and follower.
>
> In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to be send
> to each replica of the partition whereas ReplicaStatusRequest only needs to
> be sent to the leader of the partition. It doesn't seem to make much
> difference though. In practice we probably want to query the replica lag of
> many partitions and AminClient needs to send exactly one request to each
> broker with either solution. Does this make sense?
>
>
> 2) KIP-179 proposes to add the following AdminClient API:
>
> alterTopics(Collection<AlteredTopic> alteredTopics, AlterTopicsOptions
> options)
>
> Where AlteredTopic includes the following fields
>
> AlteredTopic(String name, int numPartitions, int replicationFactor,
> Map<Integer,List<Integer>> replicasAssignment)
>
> I have two comments on this:
>
> - The information in AlteredTopic seems a bit redundant. Both numPartitions
> and replicationFactor can be derived from replicasAssignment. I think we
> can probably just use the following API in AdminClient instead:
>
> AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> partitionAssignment, AlterTopicsOptions options)
>
> - Do you think "reassignPartitions" may be a better name than
> "alterTopics"? This is more consistent with the existing name used in
> kafka-reassign-partitions.sh and I also find it to be more accurate. On the
> other hand, alterTopics seems to suggest that we can also alter topic
> config.
>
>
>
> Thanks,
> Dong
>
> On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t....@gmail.com> wrote:
>
> > Hi Dong,
> >
> > Thanks for your reply.
> >
> > You're right that your DescribeDirsResponse contains appropriate data.
> The
> > comment about the log_end_offset in the KIP says "Enable user to track
> > movement progress by comparing LEO of the *.log and *.move". That makes
> me
> > wonder whether this would only work for replica movement on the same
> > broker. In the case of reassigning partitions between brokers it's only
> > really the leader for the partition that knows the max LEO of the ISR and
> > the LEO of the target broker. Maybe the comment is misleading and it
> would
> > be the leader doing the same thing in both cases. Can you clarify this?
>
>
> > At a conceptual level there's a lot of similarity between KIP-113 and
> > KIP-179 because they're both about moving replicas around. Concretely
> > though, ChangeReplicaDirRequest assumes the movement is on the same
> broker,
> > while the equivalent APIs in KIP-179 (whichever alternative) lack the
> > notion of disks. I wonder if a unified API would be better or not.
> > Operationally, the reasons for changing replicas between brokers are
> likely
> > to be motivated by balancing load across the cluster, or adding/removing
> > brokers. But the JBOD use case has different motivations. So I suspect
> it's
> > OK that the APIs are separate, but I'd love to hear what others think.
>
>
> > I think you're right about TopicPartitionReplica. At the protocol level
> we
> > could group topics and partitions together so avoid having the same topic
> > name multiple times when querying for the status of all the partitions
> of a
> > topic.
> >
> > Thanks again for taking the time to respond.
> >
> > Tom
> >
> > On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com> wrote:
> >
> > > Hey Tom,
> > >
> > > Thanks for the KIP. It seems that the prior discussion in this thread
> has
> > > focused on reassigning partitions (or AlterTopics). I haven't looked
> into
> > > this yet. I have two comments on the replicaStatus() API and the
> > > ReplicaStatusRequest:
> > >
> > > -  It seems that the use-case for ReplicaStatusRequest is covered by
> > > the DescribeDirsRequest introduced in KIP-113
> > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 113%3A+Support+replicas+movement+between+log+directories>.
> > > DescribeDirsResponse contains the logEndOffset for the specified
> > > partitions. The the lag of the follower replica can be derived as the
> > > difference between leader's LEO and follower's LEO. Do you think we can
> > > simply use DescribeDirsRequest without introducing
> ReplicaStatusRequest?
> > >
> > > - According to the current KIP, replicaStatus() takes a list of
> > partitions
> > > as input. And its output maps the partition to a list of replica
> status.
> > Do
> > > you think it would be better to have a new class called
> > > TopicPartitionReplica, which identifies the topic, partition and the
> > > brokerId. replicaStatus can then take a list of TopicPartitionReplica
> as
> > > input. And its output maps the replica to replica status. The latter
> API
> > > seems simpler and also matches the method name better. What do you
> think?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <t....@gmail.com>
> > wrote:
> > >
> > > > Hi again Ismael,
> > > >
> > > >
> > > > 1. It's worth emphasising that reassigning partitions is a different
> > > > >> process than what happens when a topic is created, so not sure
> > trying
> > > to
> > > > >> make it symmetric is beneficial. In addition to what was already
> > > > >> discussed,
> > > > >> one should also enable replication throttling before moving the
> > data.
> > > > >
> > > > >
> > > > > Thanks for your responses.The only advantage I can see from having
> > > > > separate APIs for partition count change vs. the other changes is
> > that
> > > we
> > > > > expect the former to return synchronously, and the latter to
> > (usually)
> > > > > return asynchronously. Lumping them into the same API forces
> clients
> > of
> > > > > that API to code for the two possible cases. Is this the particular
> > > > concern
> > > > > you had in mind, or do you have others?
> > > > >
> > > >
> > > > Just to help see what this looks like I've created another version of
> > > > KIP-179 (
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient
> > > > ).
> > > >
> > > > The AdminClient APIs are more type safe (harder to misuse). The
> > > > asynchronous nature of the alterReplicationFactors() and
> > > > reassignPartitions() methods aren't really apparent (you'd still have
> > to
> > > > read the javadocs to know you had to call replicaStatus() to
> determine
> > > > completion), but it's still better that the lumped-together API
> because
> > > the
> > > > methods are _consistently_ async. The Protocol APIs are slightly
> nicer
> > > too,
> > > > in that INVALID_REQUEST is less commonly returned. Another benefit is
> > it
> > > > would allow for these different alteration APIs to be modified
> > > > independently in the future, should that necessary.
> > > >
> > > > What do others think?
> > > >
> > > > You're right that the proposal doesn't cover setting and clearing a
> > > > > throttle, and it should. I will study the code about how this works
> > > > before
> > > > > posting back about that.
> > > > >
> > > >
> > > > AFAICS from the code the throttle is simply a dynamic config of the
> > > broker.
> > > > We already have the AdminClient.alterConfigs API for setting this, so
> > the
> > > > refactoring of kafka-reassign-partitions.sh could just use that
> > existing
> > > > API. We could still achieve your objective of setting the throttle
> > before
> > > > reassignment by making the --throttle option mandatory when --execute
> > was
> > > > present. Or did you have something else in mind?
> > > >
> > > > Thanks,
> > > >
> > > > Tom
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Dong Lin <li...@gmail.com>.
Hey Tom,

Thanks for your reply. Here are my thoughts:

1) I think the DescribeDirsResponse can be used by AdminClient to query the
lag of follower replica as well. Here is how it works:

- AdminClient sends DescribeDirsRequest to both the leader and the follower
of the partition.
- DescribeDirsResponse from both leader and follower shows the LEO of the
leader replica and the follower replica.
- Lag of the follower replica is the difference in the LEO between leader
and follower.

In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to be send
to each replica of the partition whereas ReplicaStatusRequest only needs to
be sent to the leader of the partition. It doesn't seem to make much
difference though. In practice we probably want to query the replica lag of
many partitions and AminClient needs to send exactly one request to each
broker with either solution. Does this make sense?


2) KIP-179 proposes to add the following AdminClient API:

alterTopics(Collection<AlteredTopic> alteredTopics, AlterTopicsOptions
options)

Where AlteredTopic includes the following fields

AlteredTopic(String name, int numPartitions, int replicationFactor,
Map<Integer,List<Integer>> replicasAssignment)

I have two comments on this:

- The information in AlteredTopic seems a bit redundant. Both numPartitions
and replicationFactor can be derived from replicasAssignment. I think we
can probably just use the following API in AdminClient instead:

AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
partitionAssignment, AlterTopicsOptions options)

- Do you think "reassignPartitions" may be a better name than
"alterTopics"? This is more consistent with the existing name used in
kafka-reassign-partitions.sh and I also find it to be more accurate. On the
other hand, alterTopics seems to suggest that we can also alter topic
config.



Thanks,
Dong

On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t....@gmail.com> wrote:

> Hi Dong,
>
> Thanks for your reply.
>
> You're right that your DescribeDirsResponse contains appropriate data. The
> comment about the log_end_offset in the KIP says "Enable user to track
> movement progress by comparing LEO of the *.log and *.move". That makes me
> wonder whether this would only work for replica movement on the same
> broker. In the case of reassigning partitions between brokers it's only
> really the leader for the partition that knows the max LEO of the ISR and
> the LEO of the target broker. Maybe the comment is misleading and it would
> be the leader doing the same thing in both cases. Can you clarify this?


> At a conceptual level there's a lot of similarity between KIP-113 and
> KIP-179 because they're both about moving replicas around. Concretely
> though, ChangeReplicaDirRequest assumes the movement is on the same broker,
> while the equivalent APIs in KIP-179 (whichever alternative) lack the
> notion of disks. I wonder if a unified API would be better or not.
> Operationally, the reasons for changing replicas between brokers are likely
> to be motivated by balancing load across the cluster, or adding/removing
> brokers. But the JBOD use case has different motivations. So I suspect it's
> OK that the APIs are separate, but I'd love to hear what others think.


> I think you're right about TopicPartitionReplica. At the protocol level we
> could group topics and partitions together so avoid having the same topic
> name multiple times when querying for the status of all the partitions of a
> topic.
>
> Thanks again for taking the time to respond.
>
> Tom
>
> On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com> wrote:
>
> > Hey Tom,
> >
> > Thanks for the KIP. It seems that the prior discussion in this thread has
> > focused on reassigning partitions (or AlterTopics). I haven't looked into
> > this yet. I have two comments on the replicaStatus() API and the
> > ReplicaStatusRequest:
> >
> > -  It seems that the use-case for ReplicaStatusRequest is covered by
> > the DescribeDirsRequest introduced in KIP-113
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 113%3A+Support+replicas+movement+between+log+directories>.
> > DescribeDirsResponse contains the logEndOffset for the specified
> > partitions. The the lag of the follower replica can be derived as the
> > difference between leader's LEO and follower's LEO. Do you think we can
> > simply use DescribeDirsRequest without introducing ReplicaStatusRequest?
> >
> > - According to the current KIP, replicaStatus() takes a list of
> partitions
> > as input. And its output maps the partition to a list of replica status.
> Do
> > you think it would be better to have a new class called
> > TopicPartitionReplica, which identifies the topic, partition and the
> > brokerId. replicaStatus can then take a list of TopicPartitionReplica as
> > input. And its output maps the replica to replica status. The latter API
> > seems simpler and also matches the method name better. What do you think?
> >
> > Thanks,
> > Dong
> >
> >
> >
> > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <t....@gmail.com>
> wrote:
> >
> > > Hi again Ismael,
> > >
> > >
> > > 1. It's worth emphasising that reassigning partitions is a different
> > > >> process than what happens when a topic is created, so not sure
> trying
> > to
> > > >> make it symmetric is beneficial. In addition to what was already
> > > >> discussed,
> > > >> one should also enable replication throttling before moving the
> data.
> > > >
> > > >
> > > > Thanks for your responses.The only advantage I can see from having
> > > > separate APIs for partition count change vs. the other changes is
> that
> > we
> > > > expect the former to return synchronously, and the latter to
> (usually)
> > > > return asynchronously. Lumping them into the same API forces clients
> of
> > > > that API to code for the two possible cases. Is this the particular
> > > concern
> > > > you had in mind, or do you have others?
> > > >
> > >
> > > Just to help see what this looks like I've created another version of
> > > KIP-179 (
> > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient
> > > ).
> > >
> > > The AdminClient APIs are more type safe (harder to misuse). The
> > > asynchronous nature of the alterReplicationFactors() and
> > > reassignPartitions() methods aren't really apparent (you'd still have
> to
> > > read the javadocs to know you had to call replicaStatus() to determine
> > > completion), but it's still better that the lumped-together API because
> > the
> > > methods are _consistently_ async. The Protocol APIs are slightly nicer
> > too,
> > > in that INVALID_REQUEST is less commonly returned. Another benefit is
> it
> > > would allow for these different alteration APIs to be modified
> > > independently in the future, should that necessary.
> > >
> > > What do others think?
> > >
> > > You're right that the proposal doesn't cover setting and clearing a
> > > > throttle, and it should. I will study the code about how this works
> > > before
> > > > posting back about that.
> > > >
> > >
> > > AFAICS from the code the throttle is simply a dynamic config of the
> > broker.
> > > We already have the AdminClient.alterConfigs API for setting this, so
> the
> > > refactoring of kafka-reassign-partitions.sh could just use that
> existing
> > > API. We could still achieve your objective of setting the throttle
> before
> > > reassignment by making the --throttle option mandatory when --execute
> was
> > > present. Or did you have something else in mind?
> > >
> > > Thanks,
> > >
> > > Tom
> > >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi Dong,

Thanks for your reply.

You're right that your DescribeDirsResponse contains appropriate data. The
comment about the log_end_offset in the KIP says "Enable user to track
movement progress by comparing LEO of the *.log and *.move". That makes me
wonder whether this would only work for replica movement on the same
broker. In the case of reassigning partitions between brokers it's only
really the leader for the partition that knows the max LEO of the ISR and
the LEO of the target broker. Maybe the comment is misleading and it would
be the leader doing the same thing in both cases. Can you clarify this?

At a conceptual level there's a lot of similarity between KIP-113 and
KIP-179 because they're both about moving replicas around. Concretely
though, ChangeReplicaDirRequest assumes the movement is on the same broker,
while the equivalent APIs in KIP-179 (whichever alternative) lack the
notion of disks. I wonder if a unified API would be better or not.
Operationally, the reasons for changing replicas between brokers are likely
to be motivated by balancing load across the cluster, or adding/removing
brokers. But the JBOD use case has different motivations. So I suspect it's
OK that the APIs are separate, but I'd love to hear what others think.

I think you're right about TopicPartitionReplica. At the protocol level we
could group topics and partitions together so avoid having the same topic
name multiple times when querying for the status of all the partitions of a
topic.

Thanks again for taking the time to respond.

Tom

On 3 August 2017 at 23:07, Dong Lin <li...@gmail.com> wrote:

> Hey Tom,
>
> Thanks for the KIP. It seems that the prior discussion in this thread has
> focused on reassigning partitions (or AlterTopics). I haven't looked into
> this yet. I have two comments on the replicaStatus() API and the
> ReplicaStatusRequest:
>
> -  It seems that the use-case for ReplicaStatusRequest is covered by
> the DescribeDirsRequest introduced in KIP-113
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 113%3A+Support+replicas+movement+between+log+directories>.
> DescribeDirsResponse contains the logEndOffset for the specified
> partitions. The the lag of the follower replica can be derived as the
> difference between leader's LEO and follower's LEO. Do you think we can
> simply use DescribeDirsRequest without introducing ReplicaStatusRequest?
>
> - According to the current KIP, replicaStatus() takes a list of partitions
> as input. And its output maps the partition to a list of replica status. Do
> you think it would be better to have a new class called
> TopicPartitionReplica, which identifies the topic, partition and the
> brokerId. replicaStatus can then take a list of TopicPartitionReplica as
> input. And its output maps the replica to replica status. The latter API
> seems simpler and also matches the method name better. What do you think?
>
> Thanks,
> Dong
>
>
>
> On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <t....@gmail.com> wrote:
>
> > Hi again Ismael,
> >
> >
> > 1. It's worth emphasising that reassigning partitions is a different
> > >> process than what happens when a topic is created, so not sure trying
> to
> > >> make it symmetric is beneficial. In addition to what was already
> > >> discussed,
> > >> one should also enable replication throttling before moving the data.
> > >
> > >
> > > Thanks for your responses.The only advantage I can see from having
> > > separate APIs for partition count change vs. the other changes is that
> we
> > > expect the former to return synchronously, and the latter to (usually)
> > > return asynchronously. Lumping them into the same API forces clients of
> > > that API to code for the two possible cases. Is this the particular
> > concern
> > > you had in mind, or do you have others?
> > >
> >
> > Just to help see what this looks like I've created another version of
> > KIP-179 (
> > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient
> > ).
> >
> > The AdminClient APIs are more type safe (harder to misuse). The
> > asynchronous nature of the alterReplicationFactors() and
> > reassignPartitions() methods aren't really apparent (you'd still have to
> > read the javadocs to know you had to call replicaStatus() to determine
> > completion), but it's still better that the lumped-together API because
> the
> > methods are _consistently_ async. The Protocol APIs are slightly nicer
> too,
> > in that INVALID_REQUEST is less commonly returned. Another benefit is it
> > would allow for these different alteration APIs to be modified
> > independently in the future, should that necessary.
> >
> > What do others think?
> >
> > You're right that the proposal doesn't cover setting and clearing a
> > > throttle, and it should. I will study the code about how this works
> > before
> > > posting back about that.
> > >
> >
> > AFAICS from the code the throttle is simply a dynamic config of the
> broker.
> > We already have the AdminClient.alterConfigs API for setting this, so the
> > refactoring of kafka-reassign-partitions.sh could just use that existing
> > API. We could still achieve your objective of setting the throttle before
> > reassignment by making the --throttle option mandatory when --execute was
> > present. Or did you have something else in mind?
> >
> > Thanks,
> >
> > Tom
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Dong Lin <li...@gmail.com>.
Hey Tom,

Thanks for the KIP. It seems that the prior discussion in this thread has
focused on reassigning partitions (or AlterTopics). I haven't looked into
this yet. I have two comments on the replicaStatus() API and the
ReplicaStatusRequest:

-  It seems that the use-case for ReplicaStatusRequest is covered by
the DescribeDirsRequest introduced in KIP-113
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories>.
DescribeDirsResponse contains the logEndOffset for the specified
partitions. The the lag of the follower replica can be derived as the
difference between leader's LEO and follower's LEO. Do you think we can
simply use DescribeDirsRequest without introducing ReplicaStatusRequest?

- According to the current KIP, replicaStatus() takes a list of partitions
as input. And its output maps the partition to a list of replica status. Do
you think it would be better to have a new class called
TopicPartitionReplica, which identifies the topic, partition and the
brokerId. replicaStatus can then take a list of TopicPartitionReplica as
input. And its output maps the replica to replica status. The latter API
seems simpler and also matches the method name better. What do you think?

Thanks,
Dong



On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <t....@gmail.com> wrote:

> Hi again Ismael,
>
>
> 1. It's worth emphasising that reassigning partitions is a different
> >> process than what happens when a topic is created, so not sure trying to
> >> make it symmetric is beneficial. In addition to what was already
> >> discussed,
> >> one should also enable replication throttling before moving the data.
> >
> >
> > Thanks for your responses.The only advantage I can see from having
> > separate APIs for partition count change vs. the other changes is that we
> > expect the former to return synchronously, and the latter to (usually)
> > return asynchronously. Lumping them into the same API forces clients of
> > that API to code for the two possible cases. Is this the particular
> concern
> > you had in mind, or do you have others?
> >
>
> Just to help see what this looks like I've created another version of
> KIP-179 (
> https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient
> ).
>
> The AdminClient APIs are more type safe (harder to misuse). The
> asynchronous nature of the alterReplicationFactors() and
> reassignPartitions() methods aren't really apparent (you'd still have to
> read the javadocs to know you had to call replicaStatus() to determine
> completion), but it's still better that the lumped-together API because the
> methods are _consistently_ async. The Protocol APIs are slightly nicer too,
> in that INVALID_REQUEST is less commonly returned. Another benefit is it
> would allow for these different alteration APIs to be modified
> independently in the future, should that necessary.
>
> What do others think?
>
> You're right that the proposal doesn't cover setting and clearing a
> > throttle, and it should. I will study the code about how this works
> before
> > posting back about that.
> >
>
> AFAICS from the code the throttle is simply a dynamic config of the broker.
> We already have the AdminClient.alterConfigs API for setting this, so the
> refactoring of kafka-reassign-partitions.sh could just use that existing
> API. We could still achieve your objective of setting the throttle before
> reassignment by making the --throttle option mandatory when --execute was
> present. Or did you have something else in mind?
>
> Thanks,
>
> Tom
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi again Ismael,


1. It's worth emphasising that reassigning partitions is a different
>> process than what happens when a topic is created, so not sure trying to
>> make it symmetric is beneficial. In addition to what was already
>> discussed,
>> one should also enable replication throttling before moving the data.
>
>
> Thanks for your responses.The only advantage I can see from having
> separate APIs for partition count change vs. the other changes is that we
> expect the former to return synchronously, and the latter to (usually)
> return asynchronously. Lumping them into the same API forces clients of
> that API to code for the two possible cases. Is this the particular concern
> you had in mind, or do you have others?
>

Just to help see what this looks like I've created another version of
KIP-179 (
https://cwiki.apache.org/confluence/display/KAFKA/Copy+of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient
).

The AdminClient APIs are more type safe (harder to misuse). The
asynchronous nature of the alterReplicationFactors() and
reassignPartitions() methods aren't really apparent (you'd still have to
read the javadocs to know you had to call replicaStatus() to determine
completion), but it's still better that the lumped-together API because the
methods are _consistently_ async. The Protocol APIs are slightly nicer too,
in that INVALID_REQUEST is less commonly returned. Another benefit is it
would allow for these different alteration APIs to be modified
independently in the future, should that necessary.

What do others think?

You're right that the proposal doesn't cover setting and clearing a
> throttle, and it should. I will study the code about how this works before
> posting back about that.
>

AFAICS from the code the throttle is simply a dynamic config of the broker.
We already have the AdminClient.alterConfigs API for setting this, so the
refactoring of kafka-reassign-partitions.sh could just use that existing
API. We could still achieve your objective of setting the throttle before
reassignment by making the --throttle option mandatory when --execute was
present. Or did you have something else in mind?

Thanks,

Tom

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi Ismael,


1. It's worth emphasising that reassigning partitions is a different
> process than what happens when a topic is created, so not sure trying to
> make it symmetric is beneficial. In addition to what was already discussed,
> one should also enable replication throttling before moving the data.


Thanks for your responses.The only advantage I can see from having separate
APIs for partition count change vs. the other changes is that we expect the
former to return synchronously, and the latter to (usually) return
asynchronously. Lumping them into the same API forces clients of that API
to code for the two possible cases. Is this the particular concern you had
in mind, or do you have others?

You're right that the proposal doesn't cover setting and clearing a
throttle, and it should. I will study the code about how this works before
posting back about that.

3. In my opinion, the warning when someone updates configs via
> `kafka-topic.sh` was a mistake. Doing the same thing via the configs tool
> is quite clunky in comparison.
>

Is that a request to remove the deprecation warning (and thus to
undeprecate using kafka-topic.sh to change configs)?

Thanks again for taking the time to respond,

Tom



On 1 August 2017 at 17:52, Ismael Juma <is...@juma.me.uk> wrote:

> Hi both,
>
> Thanks for the replies. A few points:
>
> 1. It's worth emphasising that reassigning partitions is a different
> process than what happens when a topic is created, so not sure trying to
> make it symmetric is beneficial. In addition to what was already discussed,
> one should also enable replication throttling before moving the data.
>
> 2. HTTP codes are not a great comparison as they are per response while
> Kafka responses often include multiple error codes (due to batching).
>
> 3. In my opinion, the warning when someone updates configs via
> `kafka-topic.sh` was a mistake. Doing the same thing via the configs tool
> is quite clunky in comparison.
>
> Ismael
>
> On Tue, Aug 1, 2017 at 4:52 PM, Tom Bentley <t....@gmail.com> wrote:
>
> > > Regarding adding the possibility to alter the topic config through the
> > > AlterTopic API, the current TopicCommand implementation provides a
> > warning
> > > on doing this suggesting to use the ConfigCommand tool. So it would be
> a
> > > step back allowing to do the configs change with the alter topic as
> well.
> > >
> >
> > To be clear, I'm not proposing that this be supported via the
> > kafka-reassign-partitions.sh tool, nor via kafka-topics.sh. It's purely a
> > question of whether the API for the AlterTopicsRequest/Response and the
> > AdminClient API would support it. As I said, I really don't like the
> > duplication of functionality and would prefer to leave it out, even
> though
> > it makes for a less "symmetric" API wrt CreateTopicsRequest/Response, but
> > maybe the community feels otherwise.
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Ismael Juma <is...@juma.me.uk>.
Hi both,

Thanks for the replies. A few points:

1. It's worth emphasising that reassigning partitions is a different
process than what happens when a topic is created, so not sure trying to
make it symmetric is beneficial. In addition to what was already discussed,
one should also enable replication throttling before moving the data.

2. HTTP codes are not a great comparison as they are per response while
Kafka responses often include multiple error codes (due to batching).

3. In my opinion, the warning when someone updates configs via
`kafka-topic.sh` was a mistake. Doing the same thing via the configs tool
is quite clunky in comparison.

Ismael

On Tue, Aug 1, 2017 at 4:52 PM, Tom Bentley <t....@gmail.com> wrote:

> > Regarding adding the possibility to alter the topic config through the
> > AlterTopic API, the current TopicCommand implementation provides a
> warning
> > on doing this suggesting to use the ConfigCommand tool. So it would be a
> > step back allowing to do the configs change with the alter topic as well.
> >
>
> To be clear, I'm not proposing that this be supported via the
> kafka-reassign-partitions.sh tool, nor via kafka-topics.sh. It's purely a
> question of whether the API for the AlterTopicsRequest/Response and the
> AdminClient API would support it. As I said, I really don't like the
> duplication of functionality and would prefer to leave it out, even though
> it makes for a less "symmetric" API wrt CreateTopicsRequest/Response, but
> maybe the community feels otherwise.
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
> Regarding adding the possibility to alter the topic config through the
> AlterTopic API, the current TopicCommand implementation provides a warning
> on doing this suggesting to use the ConfigCommand tool. So it would be a
> step back allowing to do the configs change with the alter topic as well.
>

To be clear, I'm not proposing that this be supported via the
kafka-reassign-partitions.sh tool, nor via kafka-topics.sh. It's purely a
question of whether the API for the AlterTopicsRequest/Response and the
AdminClient API would support it. As I said, I really don't like the
duplication of functionality and would prefer to leave it out, even though
it makes for a less "symmetric" API wrt CreateTopicsRequest/Response, but
maybe the community feels otherwise.

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Paolo Patierno <pp...@live.com>.
Hi,


Regarding the double API I agree on having only one. Compared to an HTTP REST API you could have an "already executed" (fast) operation so the HTTP response status code could be just 200 OK while a "long running" operation could have an HTTP response status code as 202 ACCEPTED (but the processing has not completed).


Regarding adding the possibility to alter the topic config through the AlterTopic API, the current TopicCommand implementation provides a warning on doing this suggesting to use the ConfigCommand tool. So it would be a step back allowing to do the configs change with the alter topic as well.


Last thing on the KIP ... the "timeout" field in the AlterTopicRequest is missing in the table with related description.


Thanks


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>


________________________________
From: Tom Bentley <t....@gmail.com>
Sent: Tuesday, August 1, 2017 2:22 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Hi Ismael,

Thanks for taking the time to look at this.

Currently the proposal around the ReplicaStatusRequest/Response just allows
you to see the lag for the given replicas. It's not something that's tied
to a prior request to alter the topic at all, though obviously you can use
it to monitor how a reassignment is progressing.

I think having the API independent of AlterTopicsRequest/Response is a good
thing, because people might want to know that independently of a
reassignment, for example. But the fact that you just see the instantaneous
lag makes it less than perfect for monitoring a reassignment. You would
have to make multiple calls to figure out how quickly the lag is
diminishing or growing. But I don't want each partition leader having to
maintain some kind of stateful calculation just in case it is queried for
the state of that partition.

So I think that problem might better be solved in the
kafka-reassign-partitions.sh tool. Let _it_ make multiple requests for
status and figure out how the lag is changing.

From the PoV of the API presented by the protocol, the proposal has moved
in the direction of making AlterTopicsRequest/Response more and more like
CreateTopicsRequest/Response, and I think it has improved as a result of
this.

Personally I don't think we should split the AlterTopicsRequest/Response
API in two, one which we expect to be short running and another which we
expect to be long running. I think that would make the API less symmetric,
and thus less easily understood. What might make more sense is to use the
return code to indicate the distinction between, "OK, it's done completely"
and "OK, it's been started, but will finish asynchronously" -- In a way
that distinction already exists via the timeout in AlterTopicsOptions. In
other words a configs change might return NONE, but a reassignment (with a
short timeout) might return REQUEST_TIMED_OUT, though I guess the problem
with that is that TimeoutException is retriable, and REQUEST_TIMED_OUT
doesn't suggest that you can use another API for query for completion, so
perhaps a dedicated status code is warranted (ASYNCHRONOUS_COMPLETION?)

To be honest, if we added the ability to update the topic configs, it would
be even more symmetric, and I wouldn't be opposed to that, except that I
dislike having two code paths for achieving the same thing, and we already
have AlterConfigsRequest and Response.

What do other people think?

Cheers,

Tom


On 1 August 2017 at 14:25, Ismael Juma <is...@juma.me.uk> wrote:

> Hi Tom,
>
> A high-level point for discussion before going into the details. The
> proposed protocol API `alterTopics` has 2 types of operations:
>
> 1. Operations that cause data movement (or deletion): increase/decrease of
> replication factor and partition reassignment. These are currently done by
> `kafka-reassign-partitions.sh` and having a way to check the progress is
> useful.
>
> 2. Operations that don't involve data movement: increase the number of
> partitions and update topic configs (this is not in the proposal, but could
> be). These are currently done by `kafka-topics.sh` and they don't usually
> take very long so progress reporting is less important.
>
> It would be worth thinking whether having 2 separate protocol APIs would be
> better. I can see pros and cons, so I'd be interested in what you and
> others think.
>
> Ismael
>
> On Tue, Aug 1, 2017 at 10:19 AM, Tom Bentley <t....@gmail.com>
> wrote:
>
> > I have added the timeout I mentioned before, because it makes the
> > implementation of topic alteration more symmetric with the topic creation
> > APIs.
> >
> > I have also added a section ("Policy") on retrofitting the
> > CreateTopicPolicy's rules to topic alteration requests, and made a few
> > other minor fixes.
> >
> > I've still not factored out the topic name from the ReplicaStatusRequest
> > (Ismael, do you have an opinion about that?)
> >
> > If any one else has some feedback on this KIP that would be great.
> > Otherwise, I would like to start a vote on this KIP before the end of the
> > week.
> >
> > Thanks,
> >
> > Tom
> >
> > On 26 July 2017 at 11:45, Tom Bentley <t....@gmail.com> wrote:
> >
> > > I've updated the KIP to fix those niggles, but I've not factored out
> the
> > > topic name from the ReplicaStatusRequest, yet.
> > >
> > > Looking at the topic creation APIs in more detail, the
> > CreateTopicsOptions
> > > has
> > >
> > > * `shouldValidateOnly()`, which would make a lot of sense for the alter
> > > topic APIs
> > > * `timeoutMs()`, which I'm not sure sure about...
> > >
> > > Topic creation doesn't require shifting replicas between brokers so
> it's
> > > reasonable to support timeout, because we don't expect it to take very
> > long.
> > >
> > > Topic alteration usually takes a while because we are going to have to
> > > move replicas. Since we're adding a whole API to track the progress of
> > that
> > > replication, I'm inclined to think that having a timeout is a bit
> > pointless.
> > >
> > > But should the replicaStatus() API have a timeout? I suppose it
> probably
> > > should.
> > >
> > >
> > > On 26 July 2017 at 10:58, Tom Bentley <t....@gmail.com> wrote:
> > >
> > >> Thanks Paolo,
> > >>
> > >>   *   in the "Public Interfaces" section you wrote
> > >>> alterTopics(Set<AlterTopics>) but then a collection is used (instead
> > of a
> > >>> set) in the Proposed Changes section. I'm ok with collection.
> > >>>
> > >>
> > >> Agree it should be Collection.
> > >>
> > >>
> > >>>   *   in the summary of the alterTopics method you say "The request
> can
> > >>> change the number of partitions, replication factor and/or the
> > partition
> > >>> assignments." I think that the "and/or" is misleading (at least for
> > me).
> > >>> For the TopicCommand tool you can specify partitions AND replication
> > factor
> > >>> ... OR partition assignments (but not for example partitions AND
> > >>> replication factor AND partition assignments). Maybe it could be "The
> > >>> request can change the number of partitions and the related
> replication
> > >>> factor or specifying a partition assignments."
> > >>>
> > >>
> > >> Is there a reason why we can't support changing all three at once? It
> > >> certainly makes conceptual sense to, say, increase the number of
> > partitions
> > >> and replication factor, and specify how you want the partitions
> > assigned.
> > >> And doing two separate calls would be less efficient as we sync new
> > >> replicas on brokers only to then move them somewhere else.
> > >>
> > >> If there is a reason we don't want to support changing all three, then
> > we
> > >> can return the error INVALID_REQUEST (42). That would allow us to
> > support
> > >> changing all three at some time in the future, without having to
> change
> > the
> > >> API.
> > >>
> > >>
> > >>>   *   I know that it would be a breaking change in the Admin Client
> API
> > >>> but why having an AlteredTopic class which is quite similar to the
> > already
> > >>> existing NewTopic class ? I know that using NewTopic for the alter
> > method
> > >>> could be misleading. What about renaming NewTopic in something like
> > >>> AdminTopic ? At same time it means that the TopicDetails class (which
> > you
> > >>> can get from the current NewTopic) should be outside the
> > >>> CreateTopicsRequest because it could be used in the
> AlterTopicsRequest
> > as
> > >>> well.
> > >>>
> > >>
> > >> One problem with this is it tends to inhibit future API changes for
> > >> either newTopics() or alterTopics(), because any common class needs to
> > make
> > >> sense in both contexts.
> > >>
> > >> For createTopics() we get to specify some configs (the
> > >> Map<String,String>), but since the AdminClient already has
> > alterConfigs()
> > >> for changing topic configs after topic creation I don't think it's
> > right to
> > >> also support changing those configs via alterTopics() as well. But
> > having
> > >> them in a common AdminTopic class would suggest that that was
> supported.
> > >> Yes, alterTopics could return INVALID_REQUEST if it was given topic
> > >> configs, but this is just making the API harder to use since it is
> > >> weakening of the type safety of the API.
> > >>
> > >> I suppose we *could* write a common TopicDetails class and make the
> > >> existing nested one extend the common one, with deprecations, to
> > eventually
> > >> remove the nested one.
> > >>
> > >>
> > >>
> > >>>   *   A typo in the ReplicaStatus : gpartition() instead of
> partition()
> > >>>   *   In the AlterTopicRequets
> > >>>      *   the replication factor should be INT16
> > >>>
> > >>
> > >> Ah, thanks!
> > >>
> > >>
> > >>>      *   I would use same fields name as CreateTopicsRequest (they
> are
> > >>> quite similar)
> > >>>   *   What's the broker id in the ReplicaStatusRequest ?
> > >>>
> > >>
> > >> It's the broker, which is expected to have a replica of the given
> > >> partition, that we're querying the status of. It is necessary because
> > we're
> > >> asking the _leader_ for the partition about (a subset of) the status
> of
> > the
> > >> followers. Put another way, to identify the replica of a particular
> > >> partition on a particular broker we need the tuple (topic, partition,
> > >> broker).
> > >>
> > >> If we were always interested in the status of the partition across all
> > >> brokers we could omit the broker part. But for reassignment we
> actually
> > >> only care about a subset of the brokers.
> > >>
> > >>
> > >>>   *   Thinking aloud, could make sense having "Partition" in the
> > >>> ReplicaStatusRequest as an array so that I can specify in only one
> > request
> > >>> the status for partitions I'm interested in, in order to avoid e
> > request
> > >>> for each partition for the same topic. Maybe empty array could mean
> ..
> > >>> "give me status for all partitions of this topic". Of course it means
> > that
> > >>> the ReplicaStatusResponse should change because we should have an
> array
> > >>> with partition, broker, lag and so on
> > >>>
> > >>
> > >> You already can specify in one request the status for all the
> partitions
> > >> you're interested in (ReplicaStatus can be repeated/is an array
> field).
> > >>
> > >> We could factor out the topic to avoid repeating it, which would be
> more
> > >> efficient when we're querying the status of many partitions of a topic
> > >> and/or there are many brokers holding replicas. In other words, we
> could
> > >> factor it to look like this:
> > >>
> > >> ReplicaStatusRequest => [TopicReplicas]
> > >>   TopicReplicas => Topic [PartitionReplica]
> > >>     Topic => string
> > >>     PartitionReplica => Partition Broker
> > >>       Partition => int32
> > >>       Broker => int32
> > >>
> > >> Does this make sense to you?
> > >>
> > >>
> > >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Tom Bentley <t....@gmail.com>.
Hi Ismael,

Thanks for taking the time to look at this.

Currently the proposal around the ReplicaStatusRequest/Response just allows
you to see the lag for the given replicas. It's not something that's tied
to a prior request to alter the topic at all, though obviously you can use
it to monitor how a reassignment is progressing.

I think having the API independent of AlterTopicsRequest/Response is a good
thing, because people might want to know that independently of a
reassignment, for example. But the fact that you just see the instantaneous
lag makes it less than perfect for monitoring a reassignment. You would
have to make multiple calls to figure out how quickly the lag is
diminishing or growing. But I don't want each partition leader having to
maintain some kind of stateful calculation just in case it is queried for
the state of that partition.

So I think that problem might better be solved in the
kafka-reassign-partitions.sh tool. Let _it_ make multiple requests for
status and figure out how the lag is changing.

From the PoV of the API presented by the protocol, the proposal has moved
in the direction of making AlterTopicsRequest/Response more and more like
CreateTopicsRequest/Response, and I think it has improved as a result of
this.

Personally I don't think we should split the AlterTopicsRequest/Response
API in two, one which we expect to be short running and another which we
expect to be long running. I think that would make the API less symmetric,
and thus less easily understood. What might make more sense is to use the
return code to indicate the distinction between, "OK, it's done completely"
and "OK, it's been started, but will finish asynchronously" -- In a way
that distinction already exists via the timeout in AlterTopicsOptions. In
other words a configs change might return NONE, but a reassignment (with a
short timeout) might return REQUEST_TIMED_OUT, though I guess the problem
with that is that TimeoutException is retriable, and REQUEST_TIMED_OUT
doesn't suggest that you can use another API for query for completion, so
perhaps a dedicated status code is warranted (ASYNCHRONOUS_COMPLETION?)

To be honest, if we added the ability to update the topic configs, it would
be even more symmetric, and I wouldn't be opposed to that, except that I
dislike having two code paths for achieving the same thing, and we already
have AlterConfigsRequest and Response.

What do other people think?

Cheers,

Tom


On 1 August 2017 at 14:25, Ismael Juma <is...@juma.me.uk> wrote:

> Hi Tom,
>
> A high-level point for discussion before going into the details. The
> proposed protocol API `alterTopics` has 2 types of operations:
>
> 1. Operations that cause data movement (or deletion): increase/decrease of
> replication factor and partition reassignment. These are currently done by
> `kafka-reassign-partitions.sh` and having a way to check the progress is
> useful.
>
> 2. Operations that don't involve data movement: increase the number of
> partitions and update topic configs (this is not in the proposal, but could
> be). These are currently done by `kafka-topics.sh` and they don't usually
> take very long so progress reporting is less important.
>
> It would be worth thinking whether having 2 separate protocol APIs would be
> better. I can see pros and cons, so I'd be interested in what you and
> others think.
>
> Ismael
>
> On Tue, Aug 1, 2017 at 10:19 AM, Tom Bentley <t....@gmail.com>
> wrote:
>
> > I have added the timeout I mentioned before, because it makes the
> > implementation of topic alteration more symmetric with the topic creation
> > APIs.
> >
> > I have also added a section ("Policy") on retrofitting the
> > CreateTopicPolicy's rules to topic alteration requests, and made a few
> > other minor fixes.
> >
> > I've still not factored out the topic name from the ReplicaStatusRequest
> > (Ismael, do you have an opinion about that?)
> >
> > If any one else has some feedback on this KIP that would be great.
> > Otherwise, I would like to start a vote on this KIP before the end of the
> > week.
> >
> > Thanks,
> >
> > Tom
> >
> > On 26 July 2017 at 11:45, Tom Bentley <t....@gmail.com> wrote:
> >
> > > I've updated the KIP to fix those niggles, but I've not factored out
> the
> > > topic name from the ReplicaStatusRequest, yet.
> > >
> > > Looking at the topic creation APIs in more detail, the
> > CreateTopicsOptions
> > > has
> > >
> > > * `shouldValidateOnly()`, which would make a lot of sense for the alter
> > > topic APIs
> > > * `timeoutMs()`, which I'm not sure sure about...
> > >
> > > Topic creation doesn't require shifting replicas between brokers so
> it's
> > > reasonable to support timeout, because we don't expect it to take very
> > long.
> > >
> > > Topic alteration usually takes a while because we are going to have to
> > > move replicas. Since we're adding a whole API to track the progress of
> > that
> > > replication, I'm inclined to think that having a timeout is a bit
> > pointless.
> > >
> > > But should the replicaStatus() API have a timeout? I suppose it
> probably
> > > should.
> > >
> > >
> > > On 26 July 2017 at 10:58, Tom Bentley <t....@gmail.com> wrote:
> > >
> > >> Thanks Paolo,
> > >>
> > >>   *   in the "Public Interfaces" section you wrote
> > >>> alterTopics(Set<AlterTopics>) but then a collection is used (instead
> > of a
> > >>> set) in the Proposed Changes section. I'm ok with collection.
> > >>>
> > >>
> > >> Agree it should be Collection.
> > >>
> > >>
> > >>>   *   in the summary of the alterTopics method you say "The request
> can
> > >>> change the number of partitions, replication factor and/or the
> > partition
> > >>> assignments." I think that the "and/or" is misleading (at least for
> > me).
> > >>> For the TopicCommand tool you can specify partitions AND replication
> > factor
> > >>> ... OR partition assignments (but not for example partitions AND
> > >>> replication factor AND partition assignments). Maybe it could be "The
> > >>> request can change the number of partitions and the related
> replication
> > >>> factor or specifying a partition assignments."
> > >>>
> > >>
> > >> Is there a reason why we can't support changing all three at once? It
> > >> certainly makes conceptual sense to, say, increase the number of
> > partitions
> > >> and replication factor, and specify how you want the partitions
> > assigned.
> > >> And doing two separate calls would be less efficient as we sync new
> > >> replicas on brokers only to then move them somewhere else.
> > >>
> > >> If there is a reason we don't want to support changing all three, then
> > we
> > >> can return the error INVALID_REQUEST (42). That would allow us to
> > support
> > >> changing all three at some time in the future, without having to
> change
> > the
> > >> API.
> > >>
> > >>
> > >>>   *   I know that it would be a breaking change in the Admin Client
> API
> > >>> but why having an AlteredTopic class which is quite similar to the
> > already
> > >>> existing NewTopic class ? I know that using NewTopic for the alter
> > method
> > >>> could be misleading. What about renaming NewTopic in something like
> > >>> AdminTopic ? At same time it means that the TopicDetails class (which
> > you
> > >>> can get from the current NewTopic) should be outside the
> > >>> CreateTopicsRequest because it could be used in the
> AlterTopicsRequest
> > as
> > >>> well.
> > >>>
> > >>
> > >> One problem with this is it tends to inhibit future API changes for
> > >> either newTopics() or alterTopics(), because any common class needs to
> > make
> > >> sense in both contexts.
> > >>
> > >> For createTopics() we get to specify some configs (the
> > >> Map<String,String>), but since the AdminClient already has
> > alterConfigs()
> > >> for changing topic configs after topic creation I don't think it's
> > right to
> > >> also support changing those configs via alterTopics() as well. But
> > having
> > >> them in a common AdminTopic class would suggest that that was
> supported.
> > >> Yes, alterTopics could return INVALID_REQUEST if it was given topic
> > >> configs, but this is just making the API harder to use since it is
> > >> weakening of the type safety of the API.
> > >>
> > >> I suppose we *could* write a common TopicDetails class and make the
> > >> existing nested one extend the common one, with deprecations, to
> > eventually
> > >> remove the nested one.
> > >>
> > >>
> > >>
> > >>>   *   A typo in the ReplicaStatus : gpartition() instead of
> partition()
> > >>>   *   In the AlterTopicRequets
> > >>>      *   the replication factor should be INT16
> > >>>
> > >>
> > >> Ah, thanks!
> > >>
> > >>
> > >>>      *   I would use same fields name as CreateTopicsRequest (they
> are
> > >>> quite similar)
> > >>>   *   What's the broker id in the ReplicaStatusRequest ?
> > >>>
> > >>
> > >> It's the broker, which is expected to have a replica of the given
> > >> partition, that we're querying the status of. It is necessary because
> > we're
> > >> asking the _leader_ for the partition about (a subset of) the status
> of
> > the
> > >> followers. Put another way, to identify the replica of a particular
> > >> partition on a particular broker we need the tuple (topic, partition,
> > >> broker).
> > >>
> > >> If we were always interested in the status of the partition across all
> > >> brokers we could omit the broker part. But for reassignment we
> actually
> > >> only care about a subset of the brokers.
> > >>
> > >>
> > >>>   *   Thinking aloud, could make sense having "Partition" in the
> > >>> ReplicaStatusRequest as an array so that I can specify in only one
> > request
> > >>> the status for partitions I'm interested in, in order to avoid e
> > request
> > >>> for each partition for the same topic. Maybe empty array could mean
> ..
> > >>> "give me status for all partitions of this topic". Of course it means
> > that
> > >>> the ReplicaStatusResponse should change because we should have an
> array
> > >>> with partition, broker, lag and so on
> > >>>
> > >>
> > >> You already can specify in one request the status for all the
> partitions
> > >> you're interested in (ReplicaStatus can be repeated/is an array
> field).
> > >>
> > >> We could factor out the topic to avoid repeating it, which would be
> more
> > >> efficient when we're querying the status of many partitions of a topic
> > >> and/or there are many brokers holding replicas. In other words, we
> could
> > >> factor it to look like this:
> > >>
> > >> ReplicaStatusRequest => [TopicReplicas]
> > >>   TopicReplicas => Topic [PartitionReplica]
> > >>     Topic => string
> > >>     PartitionReplica => Partition Broker
> > >>       Partition => int32
> > >>       Broker => int32
> > >>
> > >> Does this make sense to you?
> > >>
> > >>
> > >
> >
>

Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

Posted by Ismael Juma <is...@juma.me.uk>.
Hi Tom,

A high-level point for discussion before going into the details. The
proposed protocol API `alterTopics` has 2 types of operations:

1. Operations that cause data movement (or deletion): increase/decrease of
replication factor and partition reassignment. These are currently done by
`kafka-reassign-partitions.sh` and having a way to check the progress is
useful.

2. Operations that don't involve data movement: increase the number of
partitions and update topic configs (this is not in the proposal, but could
be). These are currently done by `kafka-topics.sh` and they don't usually
take very long so progress reporting is less important.

It would be worth thinking whether having 2 separate protocol APIs would be
better. I can see pros and cons, so I'd be interested in what you and
others think.

Ismael

On Tue, Aug 1, 2017 at 10:19 AM, Tom Bentley <t....@gmail.com> wrote:

> I have added the timeout I mentioned before, because it makes the
> implementation of topic alteration more symmetric with the topic creation
> APIs.
>
> I have also added a section ("Policy") on retrofitting the
> CreateTopicPolicy's rules to topic alteration requests, and made a few
> other minor fixes.
>
> I've still not factored out the topic name from the ReplicaStatusRequest
> (Ismael, do you have an opinion about that?)
>
> If any one else has some feedback on this KIP that would be great.
> Otherwise, I would like to start a vote on this KIP before the end of the
> week.
>
> Thanks,
>
> Tom
>
> On 26 July 2017 at 11:45, Tom Bentley <t....@gmail.com> wrote:
>
> > I've updated the KIP to fix those niggles, but I've not factored out the
> > topic name from the ReplicaStatusRequest, yet.
> >
> > Looking at the topic creation APIs in more detail, the
> CreateTopicsOptions
> > has
> >
> > * `shouldValidateOnly()`, which would make a lot of sense for the alter
> > topic APIs
> > * `timeoutMs()`, which I'm not sure sure about...
> >
> > Topic creation doesn't require shifting replicas between brokers so it's
> > reasonable to support timeout, because we don't expect it to take very
> long.
> >
> > Topic alteration usually takes a while because we are going to have to
> > move replicas. Since we're adding a whole API to track the progress of
> that
> > replication, I'm inclined to think that having a timeout is a bit
> pointless.
> >
> > But should the replicaStatus() API have a timeout? I suppose it probably
> > should.
> >
> >
> > On 26 July 2017 at 10:58, Tom Bentley <t....@gmail.com> wrote:
> >
> >> Thanks Paolo,
> >>
> >>   *   in the "Public Interfaces" section you wrote
> >>> alterTopics(Set<AlterTopics>) but then a collection is used (instead
> of a
> >>> set) in the Proposed Changes section. I'm ok with collection.
> >>>
> >>
> >> Agree it should be Collection.
> >>
> >>
> >>>   *   in the summary of the alterTopics method you say "The request can
> >>> change the number of partitions, replication factor and/or the
> partition
> >>> assignments." I think that the "and/or" is misleading (at least for
> me).
> >>> For the TopicCommand tool you can specify partitions AND replication
> factor
> >>> ... OR partition assignments (but not for example partitions AND
> >>> replication factor AND partition assignments). Maybe it could be "The
> >>> request can change the number of partitions and the related replication
> >>> factor or specifying a partition assignments."
> >>>
> >>
> >> Is there a reason why we can't support changing all three at once? It
> >> certainly makes conceptual sense to, say, increase the number of
> partitions
> >> and replication factor, and specify how you want the partitions
> assigned.
> >> And doing two separate calls would be less efficient as we sync new
> >> replicas on brokers only to then move them somewhere else.
> >>
> >> If there is a reason we don't want to support changing all three, then
> we
> >> can return the error INVALID_REQUEST (42). That would allow us to
> support
> >> changing all three at some time in the future, without having to change
> the
> >> API.
> >>
> >>
> >>>   *   I know that it would be a breaking change in the Admin Client API
> >>> but why having an AlteredTopic class which is quite similar to the
> already
> >>> existing NewTopic class ? I know that using NewTopic for the alter
> method
> >>> could be misleading. What about renaming NewTopic in something like
> >>> AdminTopic ? At same time it means that the TopicDetails class (which
> you
> >>> can get from the current NewTopic) should be outside the
> >>> CreateTopicsRequest because it could be used in the AlterTopicsRequest
> as
> >>> well.
> >>>
> >>
> >> One problem with this is it tends to inhibit future API changes for
> >> either newTopics() or alterTopics(), because any common class needs to
> make
> >> sense in both contexts.
> >>
> >> For createTopics() we get to specify some configs (the
> >> Map<String,String>), but since the AdminClient already has
> alterConfigs()
> >> for changing topic configs after topic creation I don't think it's
> right to
> >> also support changing those configs via alterTopics() as well. But
> having
> >> them in a common AdminTopic class would suggest that that was supported.
> >> Yes, alterTopics could return INVALID_REQUEST if it was given topic
> >> configs, but this is just making the API harder to use since it is
> >> weakening of the type safety of the API.
> >>
> >> I suppose we *could* write a common TopicDetails class and make the
> >> existing nested one extend the common one, with deprecations, to
> eventually
> >> remove the nested one.
> >>
> >>
> >>
> >>>   *   A typo in the ReplicaStatus : gpartition() instead of partition()
> >>>   *   In the AlterTopicRequets
> >>>      *   the replication factor should be INT16
> >>>
> >>
> >> Ah, thanks!
> >>
> >>
> >>>      *   I would use same fields name as CreateTopicsRequest (they are
> >>> quite similar)
> >>>   *   What's the broker id in the ReplicaStatusRequest ?
> >>>
> >>
> >> It's the broker, which is expected to have a replica of the given
> >> partition, that we're querying the status of. It is necessary because
> we're
> >> asking the _leader_ for the partition about (a subset of) the status of
> the
> >> followers. Put another way, to identify the replica of a particular
> >> partition on a particular broker we need the tuple (topic, partition,
> >> broker).
> >>
> >> If we were always interested in the status of the partition across all
> >> brokers we could omit the broker part. But for reassignment we actually
> >> only care about a subset of the brokers.
> >>
> >>
> >>>   *   Thinking aloud, could make sense having "Partition" in the
> >>> ReplicaStatusRequest as an array so that I can specify in only one
> request
> >>> the status for partitions I'm interested in, in order to avoid e
> request
> >>> for each partition for the same topic. Maybe empty array could mean ..
> >>> "give me status for all partitions of this topic". Of course it means
> that
> >>> the ReplicaStatusResponse should change because we should have an array
> >>> with partition, broker, lag and so on
> >>>
> >>
> >> You already can specify in one request the status for all the partitions
> >> you're interested in (ReplicaStatus can be repeated/is an array field).
> >>
> >> We could factor out the topic to avoid repeating it, which would be more
> >> efficient when we're querying the status of many partitions of a topic
> >> and/or there are many brokers holding replicas. In other words, we could
> >> factor it to look like this:
> >>
> >> ReplicaStatusRequest => [TopicReplicas]
> >>   TopicReplicas => Topic [PartitionReplica]
> >>     Topic => string
> >>     PartitionReplica => Partition Broker
> >>       Partition => int32
> >>       Broker => int32
> >>
> >> Does this make sense to you?
> >>
> >>
> >
>