You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by George Li <sq...@yahoo.com.INVALID> on 2019/05/01 06:22:09 UTC

Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

 Hi Jason,

Sorry for the late response.  been busy.  I have taken a brief look at Colin's KIP-455. It's a good direction for Reassignments. I think KIP-236, KIP-435,  KIP-455 need some consensus / coordination for where the new reassignment data AND the original replicas is stored.  

The current way of storing new reassignment data  in  ZK node /admin/reassign_partitions Vs the KIP-455 storing in the /brokers/topics/<topic>/<partitions> level has pros and cons. 

Storing in ZK node ZK node /admin/reassign_partitions can quickly find out the current pending/active reassignments.  Storing in topic/partition level is generally fine for controller, since the active reassignments are also kept in memory Controller Context.  One issue I see is when controller failover occurs,  the new controller will need to read the active reassignments by scanning the /brokers/topics/<topic>/<partitions>,  for some clusters with hundred+ brokers and 10s thousands of partitions per broker, it might not be scalable for this scan?  The failover controller needs to come up as quickly as possible? 

Another issue, the controller has all the active reassignment in Memory (as well as in ZK partition level), the non-controller brokers will maintain its Reassignment context in memory as well thru Controller's LeaderAndIsrRequest with additional reassignment data,  if the broker missing the LeaderAndIsrRequest, will it cause that broker's discrepancies?  for performance reason, we do not want the broker's to scan all topic/partition to find current active reassignments ? 
 
I agree this is more flexible in the long term to have the capability of submitting new reassignments while current batch is still active.  You brought up a good point of allowing canceling individual reassignment,  I will have the reassignment cancel RPC to specify individual partition for cancellation, if it's null, then cancel all pending reassignments.  In the current work environment, Reassignment operations are very sensitive operations to the cluster's latency, especially for the cluster with min.insync.replicas >=2, So when reassignment is kicked off and cluster performance is affected, the most desirable features that we would like is to cancel all pending reassignments in a timely/cleanly fashion so that the cluster performance can be restored. That's the main purpose of KIP-236.  For data storage product/system, it's good to have the cancel/rollback feature.

Thanks,
George

    On Wednesday, April 24, 2019, 10:57:45 AM PDT, Jason Gustafson <ja...@confluent.io> wrote:  
 
 Hi George,

I think focusing on the admin API for cancellation in this KIP is
reasonable. Colin wrote up KIP-455 which adds APIs to submit a reassignment
and to list reassignments in progress. Probably as long as we make these
APIs all consistent with each other, it is fine to do them separately.

The major point we need to reach agreement on is how an active reassignment
will be stored. In the proposal here, we continue to
use /admin/reassign_partitions and we further extend it to differentiate
between the original replicas and the target replicas of a reassignment. In
KIP-455, Colin proposes to store the same information, but do it at the
partition level. Obviously we would only do one of these. I think the main
difference is how we think about reassignment in general. Currently we see
it as basically a batch job which is submitted atomically and not completed
until all the partitions in the reassignment have been moved. The other
perspective is to treat reassignments more fluidly. At any point, we just
have some partitions which are undergoing reassignment. Then there is no
longer a question about supporting multiple reassignments: we can reassign
new partitions at any time and we can cancel individual partition
reassignments also at any time. This is a more ambitious approach, but it
seems much more flexible in the long term.

Thanks,
Jason



On Fri, Apr 5, 2019 at 7:25 PM George Li <sq...@yahoo.com.invalid>
wrote:

>  Hi Jun,
>
> Thanks for the feedback!
>
> for 40,  I agree.  It makes sense to do it via RPC request to the
> controller.  Maybe for KIP-236,  I will just implement RPC for reassignment
> cancellation only?  otherwise if KIP-236 includes other previous
> Reassignment related operations such as Submitting Reassignments,
> --generate,  --verify, Change/Remove Throttle, I think KIP-236 will incur a
> lot of changes.
>
> For 41, the current reassignment logic, after /admin/reassign_partitions
> has the "new_replicas" for the topic/partition,  the controller will set
> the AR (Assigned Replicas) to be the Set(original_replicas) +
> Set(new_replicas),  thus when reassignment begins, the "original_replicas"
> info is lost. It's not possible to use the AR and new_replicas in
> /admin/reassign_partitions to derive back the "original_replicas".
>
> To cancel the current pending reassignments, the "original_replicas" info
> is required to restore/rollback.
>
> As a matter of fact, if we have "original_replicas" info when doing
> reassignment, some features can be accomplished.  e.g. To name a few:
> separating the Reassignment Replication Traffic with the normal Follower's
> ReplicaFetcher thread pool, the dedicated Reassignment ReplicaFetcher
> thread pool can have throttle enforced, while existing Follower
> ReplicaFetcher not throttled. some networking parameters can be tuned for
> Reassignment ReplicaFetcher thread pool.  The Reassignment can have its own
> MaxLag, TotalLag metrics.  and URP (Under Replicated Partition) can be
> reported differently for existing followers Vs. the URP caused by
> Reassignment.  Anyway, these might be some future KIPs that we might
> submit.
>
> Thanks,
> George
>
>
>    On Thursday, April 4, 2019, 4:32:26 PM PDT, Jun Rao <ju...@confluent.io>
> wrote:
>
>  Hi, George,
>
> Thanks for the KIP. Sorry for the late reply. A couple of comments below.
>
> 40. I agree that it's better to issue an RPC request to the controller for
> reassignment cancellation. If we do that, it would be useful to decide
> whether that call blocks on cancellation completion or not.
>
> 41. Is it necessary to add the new "original replicas" field in
> /admin/reassign_partitions?
> The original replicas are already in the topic path in ZK.
>
> Jun
>
> On Tue, Mar 26, 2019 at 5:24 PM George Li <sql_consulting@yahoo.com
> .invalid>
> wrote:
>
> >  Hi Ismael,
> >
> > Thanks,  I understand your points. I will add the RPC mechanism for
> > reassignments to KIP-236.  I can think of a few Requests/Responses
> > corresponding to the old Scala AdminClient using ZK:
> >
> > SubmitReassignments (--execute)
> > StatusReassignments (--verify)
> > ChangeOrRemoveReassignmentsThrottle (--verify)
> > GenerateReassignments (--generate)
> > CancelPendingReassignments (new in KIP-236)
> >
> > To clarify the "structure" change of KIP-236, the ZK public interface
> > remains the same:  {topic, partition, new_replicas},  the user client
> > generate/submit the reassignment plan the same way as before.  the new
> > "original_replicas" in the ZK node is added by admin client before
> writing
> > to the ZK node.  A bit similar to the "log_dirs".  User's direct
> > modification of ZK /admin/reassign_partitions is strongly discouraged.
> >
> > The original_replicas info is essential for cancellation/rollback of the
> > reassignments still pending.
> >
> > Thanks,
> > George
> >
> >    On Monday, March 25, 2019, 4:43:30 PM PDT, Ismael Juma <
> > ismael@juma.me.uk> wrote:
> >
> >  Hi George,
> >
> > The goal is not to prevent people from updating ZK directly. The goal is
> to
> > offer a solution where people don't have to. If people then decide to
> avoid
> > the recommended path, they can deal with the consequences. However, if we
> > add another structure in ZK and no RPC mechanism, then there is no
> > recommended path apart from updating ZK (implicitly making it an API for
> > users).
> >
> > Ismael
> >
> > On Mon, Mar 25, 2019 at 3:57 PM George Li <sql_consulting@yahoo.com
> > .invalid>
> > wrote:
> >
> > >  Thanks Ismael.  One question, even switch to submitting reassignments
> > via
> > > RPC instead of Zookeeper.  The reassignment data will still persist in
> > > ZooKeeper node /admin/reassign_partitions (e.g. when Controller
> failover
> > it
> > > can resume reassignments)?  If yes, how this can keep someone from
> > > modifying ZK (/admin/reassign_partitions) directly ?
> > >
> > >
> > > Thanks,
> > > George
> > >
> > >    On Saturday, March 23, 2019, 1:07:11 PM PDT, Ismael Juma <
> > > ismaelj@gmail.com> wrote:
> > >
> > >  Thanks for the KIP, making reassignment more flexible is definitely
> > > welcome. As others have mentioned, I think we need to do it via the
> Kafka
> > > protocol and not via ZK. The latter introduces an implicit API that
> other
> > > tools will depend on causing migration challenges. This has already
> > > happened with the existing ZK based interface and we should avoid
> > > introducing more tech debt here.
> > >
> > > Ismael
> > >
> > > On Sat, Mar 23, 2019, 12:09 PM Colin McCabe <cm...@apache.org>
> wrote:
> > >
> > > > On Thu, Mar 21, 2019, at 20:51, George Li wrote:
> > > > >  Hi Colin,
> > > > >
> > > > > I agree with your proposal of having administrative APIs through
> RPC
> > > > > instead of ZooKeeper. But seems like it will incur significant
> > changes
> > > > > to both submitting reassignments and this KIP's cancelling pending
> > > > > reassignments.
> > > > >
> > > > > To make this KIP simple and moving along, I will be happy to do
> > another
> > > > > follow-up KIP to change all reassignment related operations via RP
> > > >
> > > > Thanks, George.  I think doing it as a two-step process is fine, but
> I
> > > > suspect it would be much easier and quicker to do the RPC conversion
> > > first,
> > > > and the interruptible part later.  The reason is because a lot of the
> > > > things that people have brought up as concerns with this KIP are
> really
> > > > issues with the API (how will people interact with ZK, how does
> access
> > > > control work, what does the format look like in ZK) that will just go
> > > away
> > > > once we have an RPC.
> > > >
> > > > > Just curious,  KIP-4 includes Topics/ACL related operations. In
> > > > > addition to Reassignments,  any other operations should be done via
> > > > > RPC?
> > > >
> > > > I think all of the administrative shell scripts have been converted
> > > except
> > > > kafka-configs.sh.  I believe there is a KIP for that conversion.
> > > > Reassigning partitions is probably the biggest KIP-4 gap we have
> right
> > > now.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > >
> > > > > Thanks,
> > > > > George
> > > > >
> > > > >
> > > > >    On Wednesday, March 20, 2019, 5:28:59 PM PDT, Colin McCabe
> > > > > <co...@cmccabe.xyz> wrote:
> > > > >
> > > > >  Hi George,
> > > > >
> > > > > One big problem here is that administrative APIs should be done
> > through
> > > > > RPCs, not through ZooKeeper.  KIP-4 (Command line and centralized
> > > > > administrative operations) describes the rationale for this.  We
> want
> > > > > public and stable APIs that don't depend on the internal
> > representation
> > > > > of stuff in ZK, which will change over time.  Tools shouldn't have
> to
> > > > > integrate with ZK or understand the internal data structures of
> Kafka
> > > > > to make administrative changes.  ZK doesn't have a good security,
> > > > > access control, or compatibility story.
> > > > >
> > > > > We should create an official reassignment RPC for Kafka.  This will
> > > > > solve many of the problems discussed in this thread, I think.  For
> > > > > example, with an RPC, users won't be able to make changes unless
> they
> > > > > have ALTER on KafkaCluster.  That avoids the problem of random
> users
> > > > > making changes without the administrator knowing.  Also, if
> multiple
> > > > > users are making changes, there is no risk that they will overwrite
> > > > > each other's changes, since they won't be modifying the internal ZK
> > > > > structures directly.
> > > > >
> > > > > I think a good reassignment API would be something like:
> > > > >
> > > > > > ReassignPartitionsResults reassignPartitions(Map<TopicPartition,
> > > > PartitionAssignment> reassignments);
> > > > > >
> > > > > > class PartitionAssignment {
> > > > > >  List<Integer> nodes;
> > > > > > }
> > > > > >
> > > > > > class ReassignPartitionsResults {
> > > > > >  Map<TopicPartition, KafkaFuture<Void>> pending;
> > > > > >  Map<TopicPartition, KafkaFuture<Void>> completed;
> > > > > >  Map<TopicPartition, KafkaFuture<Void>> rejected;
> > > > > > }
> > > > > >
> > > > > > PendingReassignmentResults pendingReassignments();
> > > > > >
> > > > > > class PendingReassignmentResults {
> > > > > >  KafkaFuture<Map<TopicPartition, PartitionAssignment>> pending;
> > > > > >  KafkaFuture<Map<TopicPartition, PartitionAssignment>> previous;
> > > > > > }
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Tue, Mar 19, 2019, at 15:04, George Li wrote:
> > > > > >  Hi Viktor,
> > > > > >
> > > > > > Thanks for the review.
> > > > > >
> > > > > > If there is reassignment in-progress while the cluster is
> upgraded
> > > > with
> > > > > > this KIP (upgrade the binary and then do a cluster rolling
> restart
> > of
> > > > > > the brokers), the reassignment JSON in Zookeeper
> > > > > > /admin/reassign_partitions will only have  {topic, partition,
> > > > > > replicas(new)} info when the batch of reassignment was kicked off
> > > > > > before the upgrade,  not with the "original_replicas" info per
> > > > > > topic/partition.  So when the user is trying to cancel/rollback
> the
> > > > > > reassignments, it's going to fail and the cancellation will be
> > > skipped
> > > > > > (The code in this KIP will check the if the "original_replicas"
> is
> > in
> > > > > > the /admin/reassign_partition).
> > > > > >
> > > > > > The user either has to wait till current reassignments to finish
> or
> > > > > > does quite some manual work to cancel them (delete ZK node,
> bounce
> > > > > > controller, re-submit reassignments with original replicas to
> > > > rollback,
> > > > > > if the original replicas are kept before the last batch of
> > > > > > reassignments were submitted).
> > > > > >
> > > > > > I think this scenario of reassignments being kicked off by
> > end-user,
> > > > > > not by the team(s) that managed Kafka infrastructure might be
> rare
> > > > > > (maybe in some very small companies?),  since only one batch of
> > > > > > reassignments can be running at a given time in
> > > > > > /admin/reassign_partitions.  The end-users need some
> co-ordination
> > > for
> > > > > > submitting reassignments.
> > > > > >
> > > > > > Thanks,
> > > > > > George
> > > > > >
> > > > > >
> > > > > >    On Tuesday, March 19, 2019, 3:34:20 AM PDT, Viktor
> Somogyi-Vass
> > > > > > <vi...@gmail.com> wrote:
> > > > > >
> > > > > >  Hey George,
> > > > > > Thanks for the answers. I'll try to block out time this week to
> > > review
> > > > > > your PR.
> > > > > > I have one more point to clarify:I've seen some customers who are
> > > > > > managing Kafka as an internal company-wide service and they may
> or
> > > may
> > > > > > not know that how certain topics are used within the company.
> That
> > > > > > might mean that some clients can start reassignment at random
> > > > > > times. Let's suppose that such a reassignment starts just when
> the
> > > > > > Kafka operation team starts upgrading the cluster that contains
> > this
> > > > > > KIP. The question is: do you think that we need handle upgrade
> > > > > > scenarios where there is an in-progress reassignment?
> > > > > > Thanks,
> > > > > > Viktor
> > > > > >
> > > > > > On Tue, Mar 19, 2019 at 6:16 AM George Li <
> > sql_consulting@yahoo.com>
> > > > wrote:
> > > > > >
> > > > > >  Hi Viktor,
> > > > > > FYI, I have added a new ducktape
> > > > > > test:  tests/kafkatest/tests/core/reassign_cancel_test.py
> > > > > > to https://github.com/apache/kafka/pull/6296
> > > > > > After review, do you have any more questions?  Thanks
> > > > > >
> > > > > > Hi Jun,
> > > > > > Could you help review this when you have time?  Thanks
> > > > > >
> > > > > > Can we start a vote on this KIP in one or two weeks?
> > > > > >
> > > > > > Thanks,George
> > > > > >
> > > > > >
> > > > > >    On Tuesday, March 5, 2019, 10:58:45 PM PST, George Li
> > > > > > <sq...@yahoo.com> wrote:
> > > > > >
> > > > > >  Hi Viktor,
> > > > > >
> > > > > >
> > > > > > >  2.: One follow-up question: if the reassignment cancellation
> > gets
> > > > interrupted and a failover happens after step #2 but before step #3,
> > how
> > > > will the new controller continue? At this stage Zookeeper would
> contain
> > > > OAR + RAR, however the brokers will have the updated LeaderAndIsr
> about
> > > > OAR, so they won't know about RAR. I would suppose the new controller
> > > would
> > > > start from the beginning as it only knows what's in Zookeeper. Is
> that
> > > true?
> > > > > >
> > > > > > The OAR (Original Assigned Replicas) for the rollback is stored
> in
> > > the
> > > > > > /admin/reassign_partitions for each topic/partition
> reassignments.
> > > > > > During the controller failover,  the new controller will read
> > > > > > /admin/reassign_partitions for both new_replicas AND
> > > original_replicas
> > > > > > into controllerContext.partitionsBeingReassigned
> > > > > >
> > > > > > then perform pending reassignments cancellation/rollback.
> > > > > > The originalReplicas  is added below:
> > > > > > case class ReassignedPartitionsContext(var newReplicas: Seq[Int]
> =
> > > > > > Seq.empty,
> > > > > >                                        var originalReplicas:
> > > Seq[Int]=
> > > > > > Seq.empty,
> > > > > >                                        val
> > reassignIsrChangeHandler:
> > > > > > PartitionReassignmentIsrChangeHandler) {
> > > > > >
> > > > > >
> > > > > > > 2.1: Another interesting question that are what are those
> > replicas
> > > > are doing which are online but not part of the leader and ISR? Are
> they
> > > > still replicating? Are they safe to lie around for the time being?
> > > > > >
> > > > > > I think they are just dangling without being replicated.  Upon
> > > > > > LeaderAndIsr request, the makeFollowers()
> > > > > > will replicaFetcherManager.removeFetcherForPartitions() and it
> will
> > > > not
> > > > > > be added in ReplicaFetcher since they are not in the current AR
> > > > > > (Assigned Replicas).  Step 4 (StopReplica) will delete them.
> > > > > >
> > > > > >
> > > > > > I am adding a new ducktape test:
> > > > > > tests/kafkatest/tests/core/reassign_cancel_test.py for cancelling
> > > > > > pending reassignments.  but it's not easy to simulate controller
> > > > > > failover during the cancellation since the cancellation/rollback
> is
> > > so
> > > > > > fast to complete. See below.  I do have a unit/integration test
> to
> > > > > > simulate controller
> > > > > > failover:  shouldTriggerReassignmentOnControllerStartup()
> > > > > >
> > > > > >
> > > > > > Thanks,George
> > > > > >
> > > > > >
> > > > > >
> > > > > > Warning: You must run Verify periodically, until the reassignment
> > > > > > completes, to ensure the throttle is removed. You can also alter
> > the
> > > > > > throttle by rerunning the Execute command passing a new value.The
> > > > > > inter-broker throttle limit was set to 1024 B/sSuccessfully
> started
> > > > > > reassignment of partitions.
> > > > > > [INFO  - 2019-03-05 04:20:48,321 - kafka -
> execute_reassign_cancel
> > -
> > > > > > lineno:514]: Executing cancel / rollback pending partition
> > > > > > reassignment...[DEBUG - 2019-03-05 04:20:48,321 - kafka -
> > > > > > execute_reassign_cancel - lineno:515]:
> > > > > > /opt/kafka-dev/bin/kafka-reassign-partitions.sh --zookeeper
> > > > > > worker1:2181 --cancel[DEBUG - 2019-03-05 04:20:48,321 -
> > remoteaccount
> > > > -
> > > > > > _log - lineno:160]: vagrant@worker2: Running ssh command:
> > > > > > /opt/kafka-dev/bin/kafka-reassign-partitions.sh --zookeeper
> > > > > > worker1:2181 --cancel[DEBUG - 2019-03-05 04:20:57,452 - kafka -
> > > > > > execute_reassign_cancel - lineno:520]: Verify cancel / rollback
> > > > pending
> > > > > > partition reassignment:Rolling back the current pending
> > reassignments
> > > > > > Map(test_topic-7 -> Map(replicas -> Buffer(3, 2, 5),
> > > original_replicas
> > > > > > -> Buffer(3, 2, 4)), test_topic-18 -> Map(replicas -> Buffer(5,
> 1,
> > > 2),
> > > > > > original_replicas -> Buffer(4, 1, 2)), test_topic-3 ->
> Map(replicas
> > > ->
> > > > > > Buffer(5, 2, 3), original_replicas -> Buffer(4, 2, 3)),
> > test_topic-15
> > > > > > -> Map(replicas -> Buffer(1, 3, 5), original_replicas ->
> Buffer(1,
> > 3,
> > > > > > 4)), test_topic-11 -> Map(replicas -> Buffer(2, 3, 5),
> > > > > > original_replicas -> Buffer(2, 3, 4)))Successfully submitted
> > > > > > cancellation of reassignments.The cancelled pending reassignments
> > > > > > throttle was removed.Please run --verify to have the previous
> > > > > > reassignments (not just the cancelled reassignments in progress)
> > > > > > throttle removed.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >    On Tuesday, March 5, 2019, 8:20:25 AM PST, Viktor Somogyi-Vass
> > > > > > <vi...@gmail.com> wrote:
> > > > > >
> > > > > >  Hey George,
> > > > > > Sorry for the delay.I'll answer point-by-point:1.2: I think it's
> > > fine.
> > > > > > As you say we presume that the client knows the state of the
> > cluster
> > > > > > before doing the reassignment, so we can presume the same for
> > > > > > cancellation.2.: One follow-up question: if the reassignment
> > > > > > cancellation gets interrupted and a failover happens after step
> #2
> > > but
> > > > > > before step #3, how will the new controller continue? At this
> stage
> > > > > > Zookeeper would contain OAR + RAR, however the brokers will have
> > the
> > > > > > updated LeaderAndIsr about OAR, so they won't know about RAR. I
> > would
> > > > > > suppose the new controller would start from the beginning as it
> > only
> > > > > > knows what's in Zookeeper. Is that true?2.1: Another interesting
> > > > > > question that are what are those replicas are doing which are
> > online
> > > > > > but not part of the leader and ISR? Are they still replicating?
> Are
> > > > > > they safe to lie around for the time being?
> > > > > > Best,
> > > > > > Viktor
> > > > > >
> > > > > > On Fri, Mar 1, 2019 at 8:40 PM George Li <
> sql_consulting@yahoo.com
> > >
> > > > wrote:
> > > > > >
> > > > > >  Hi Jun,
> > > > > > Could you help review KIP-236 when you have time?  Thanks.
> > > > > >
> > > > > >
> > > > > > Hi Becket,
> > > > > > Since you filed https://issues.apache.org/jira/browse/KAFKA-6304
> > to
> > > > > > request this feature.  Could you also help review and comment on
> > > > > > KIP-236 ?  Thanks.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Hi Viktor,
> > > > > > I have updated https://github.com/apache/kafka/pull/6296  and
> also
> > > > > > KIP-236:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment
> > > Please
> > > > see below new section "Skip Reassignment Cancellation Scenarios":
> > > > > >
> > > > > > Skip Reassignment Cancellation Scenarios
> > > > > >
> > > > > > There are a couple scenarios that the Pending reassignments
> > > > > > in /admin/reassign_partitions can not be cancelled / rollback.
> > > > > >
> > > > > >    - If the "original_replicas"  is missing for the
> topic/partition
> > > > > > in /admin/reassign_partitions .  In this case, the pending
> > > > reassignment
> > > > > > cancelled will be skipped.  Because there is no way to reset to
> the
> > > > > > original replicas.  The reasons this can happened  could be:
> > > > > >      - if either the user/client is
> > > > > > tampering /admin/reassign_partitions directly, and does not have
> > the
> > > > > > "original_replicas" for the topic
> > > > > >      - if the user/client is using incorrect versions of the
> admin
> > > > > > client to submit for reassignments.  The Kafka software should be
> > > > > > upgraded not just for all the brokers in the cluster.  but also
> on
> > > the
> > > > > > host that is used to submit reassignments.
> > > > > >
> > > > > >
> > > > > >
> > > > > >    - If all the "original_replicas" brokers are not in ISR,  and
> > some
> > > > > > brokers in the "new_replicas" are not offline for the
> > topic/partition
> > > > > > in the pending reassignments.  In this case, it's better to skip
> > this
> > > > > > topic's pending reassignment  cancellation/rollback,  otherwise,
> it
> > > > > > will become offline.  However,  if all the brokers in
> > > > > > "original_replicas" are offline  AND  all the brokers in
> > > > "new_replicas"
> > > > > > are also offline for this topic/partition,  then the cluster is
> in
> > > > such
> > > > > > a bad state, the topic/partition is currently offline anyway,  it
> > > will
> > > > > > cancel/rollback this topic pending reassignments back to the
> > > > > > "original_replicas".
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > What other scenarios others can think of that reassignment
> > > > cancellation
> > > > > > should be skipped?  Thanks
> > > > > >
> > > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > Another issues I would like to raise is the removing of throttle
> > for
> > > > > > the Cancelled Reassignments.  Currently the remove throttle code
> is
> > > in
> > > > > > the Admin Client.  Since the pending reassignments are cancelled
> > > > > > /rollback,  the throttle would not be removed by running admin
> > client
> > > > > > with --verify option to remove the throttle.  My approached is to
> > > > > > remove the throttle in the admin client after the reassignments
> > > > > > cancellation.  But I feel it's better to move this in Controller
> > (in
> > > > > > controller failover scenario).
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > George
> > > > > >
> > > > > >
> > > > > >
> > > > > >    On Monday, February 25, 2019, 11:40:08 AM PST, George Li
> > > > > > <sq...@yahoo.com> wrote:
> > > > > >
> > > > > >  Hi Viktor,
> > > > > > Thanks for the response.  Good questions!  answers below:
> > > > > > > A few questions regarding the rollback algorithm:> 1. At step 2
> > how
> > > > do you elect the leader?
> > > > > >
> > > > > > The step 2 code is in Enable pending reassignments
> > > > > > cancellation/rollback by sql888 · Pull Request #6296 ·
> > > > > >
> > > > apache/kafka
> > core/src/main/scala/kafka/controller/KafkaController.scala
> > > line#622
> > > > > >
> > > > > > |
> > > > > > |
> > > > > > |
> > > > > > |  |  |
> > > > > >
> > > > > >  |
> > > > > >
> > > > > >  |
> > > > > > |
> > > > > > |  |
> > > > > > Enable pending reassignments cancellation/rollback by sql888 ·
> Pull
> > > > Requ...
> > > > > >
> > > > > > Enable pending reassignments (reassignments still in-flight in
> > > > > > /admin/reassign_partitions) cancellation/rollback...
> > > > > >  |
> > > > > >
> > > > > >  |
> > > > > >
> > > > > >  |
> > > > > >
> > > > > >
> > > > > >
> > > > > > rollbackReassignedPartitionLeaderIfRequired(topicPartition,
> > > > > > reassignedPartitionContext)During "pending" reassignment, e.g.
> > > > (1,2,3)
> > > > > > => (4,2,5)  normally, the leader (in this case broker_id 1) will
> > > > remain
> > > > > > as the leader until all replicas (1,2,3,4,5) in ISR, then the
> > leader
> > > > > > will be switched to 4.  However, in one scenario, if let's say
> new
> > > > > > replica 4 is already caught up in ISR, and somehow original
> leader
> > 1
> > > > is
> > > > > > down or bounced.  4 could become the new
> > > > > > leader. rollbackReassignedPartitionLeaderIfRequired() will do a
> > > > > > leadership election using
> > > > > > PreferredReplicaPartitionLeaderElectionStrategy
> > > > > >  among brokers in OAR (Original Assigned Replicas set in
> memory). >
> > > > > > 1.1. Would it be always the original leader?
> > > > > > Not necessarily,  if the original preferred leader is down, it
> can
> > be
> > > > > > other brokers in OAR which are in ISR.
> > > > > > > 1.2. What if some brokers that are in OAR are down?
> > > > > > If some brokers in OAR are down, the topic/partition will have
> URP
> > > > > > (Under Replicated Partition). The client deciding to do
> > reassignment
> > > > > > should be clear what the current state of the cluster is, what
> > > brokers
> > > > > > are down, what are up, what reassignment is trying to accomplish.
> > > e.g.
> > > > > > reassignment from down brokers to new brokers(?)
> > > > > >
> > > > > > > 2. I still have doubts that we need to do the reassignment
> > > backwards
> > > > during rollback. For instance if we decide to cancel the reassignment
> > at
> > > > step > #8 where replicas in OAR - RAR are offline and start the
> > rollback,
> > > > then how do we make a replica from OAR online again before electing a
> > > > leader as described in step #2 of the rollback algorithm?> 3. Does
> the
> > > > algorithm defend against crashes? Is it able to continue after a
> > > controller
> > > > failover?
> > > > > > > 4. I think it would be a good addition if you could add few
> > example
> > > > scenarios for rollback.
> > > > > >
> > > > > > yes. shouldTriggerReassignCancelOnControllerStartup() is the
> > > > > > integration test to simulate controller failover while cancelling
> > > > > > pending reassignments.  I will try to add controller failover
> > > scenario
> > > > > > in a ducktape system test.
> > > > > > You do raise a good point here. If the cluster is in a very BAD
> > > > shape,
> > > > > > e.g. None of the OAR brokers are online,  but some new broker in
> > RAR
> > > > is
> > > > > > in ISR and is current leader, it make senses not to rollback to
> > keep
> > > > > > that topic/partition online. However, if none of brokers in  RAR
> is
> > > > > > online, it may make sense to rollback to OAR and remove it from
> > > > > > /admin/reassign_partitions, since the cluster state is already so
> > > bad,
> > > > > > that topic/partition is offline anyway no matter rollback or not.
> > > > > > I will add a check before cancel/rollback a topic/partition's
> > pending
> > > > > > reassignment by checking whether at least one broker of OAR is in
> > > ISR,
> > > > > > so that it can be elected as leader,  if not, skip that
> > > > topic/partition
> > > > > > reassignment cancellation and raise an exception.
> > > > > > I will list a few more scenarios for rollback.
> > > > > > What additional scenarios for rollback you and others can think
> of?
> > > > > >
> > > > > > Thanks,George
> > > > > >
> > > > > >    On Monday, February 25, 2019, 3:53:33 AM PST, Viktor
> > Somogyi-Vass
> > > > > > <vi...@gmail.com> wrote:
> > > > > >
> > > > > >  Hey George,
> > > > > > Thanks for the prompt response, it makes sense. I'll try to keep
> > your
> > > > > > code changes on top of my list and help reviewing that.
> :)Regarding
> > > > the
> > > > > > incremental reassignment: I don't mind either to discuss it as
> part
> > > of
> > > > > > this or in a separate conversation but I think a separate one
> could
> > > be
> > > > > > better because both discussions can be long and keeping them
> > > separated
> > > > > > would limit the scope and make them more digestible and focused.
> If
> > > > the
> > > > > > community decides to discuss it here then I think I'll put
> KIP-435
> > on
> > > > > > hold or rejected and add my ideas here. If the community decides
> to
> > > > > > discuss it in a different KIP I think it's a good idea to move
> the
> > > > > > planned future work part into KIP-435 and rework that. Maybe we
> can
> > > > > > co-author it as I think both works could be complementary to each
> > > > > > other. In any case I'd be absolutely interested in what others
> > think.
> > > > > > A few questions regarding the rollback algorithm:1. At step 2 how
> > do
> > > > > > you elect the leader? 1.1. Would it be always the original
> > > > leader? 1.2.
> > > > > > What if some brokers that are in OAR are down?2. I still have
> > doubts
> > > > > > that we need to do the reassignment backwards during rollback.
> For
> > > > > > instance if we decide to cancel the reassignment at step #8 where
> > > > > > replicas in OAR - RAR are offline and start the rollback, then
> how
> > do
> > > > > > we make a replica from OAR online again before electing a leader
> as
> > > > > > described in step #2 of the rollback algorithm?3. Does the
> > algorithm
> > > > > > defend against crashes? Is it able to continue after a controller
> > > > > > failover?4. I think it would be a good addition if you could add
> > few
> > > > > > example scenarios for rollback.
> > > > > > Best,
> > > > > > Viktor
> > > > > >
> > > > > > On Fri, Feb 22, 2019 at 7:04 PM George Li <
> > sql_consulting@yahoo.com>
> > > > wrote:
> > > > > >
> > > > > >  Hi Viktor,
> > > > > >
> > > > > > Thanks for reading and provide feedbacks on KIP-236.
> > > > > >
> > > > > > For reassignments, one can generate a json for new assignments
> and
> > > > > > another json with "original" assignments for rollback purpose.
> In
> > > > > > production cluster, from our experience, we need to submit the
> > > > > > reassignments in batches with throttle/staggering to minimize the
> > > > > > impact to the cluster.  Some large topic/partition couple with
> > > > throttle
> > > > > > can take pretty long time for the new replica to be in ISR to
> > > complete
> > > > > > reassignment in that batch. Currently during this,  Kafka does
> not
> > > > > > allow cancelling the pending reassignments cleanly.  Even you
> have
> > > the
> > > > > > json with the "original" assignments to rollback, it has to wait
> > till
> > > > > > current reassignment to complete, then submit it as reassignments
> > to
> > > > > > rollback. If the current reassignment is causing impact to
> > > production,
> > > > > > we would like the reassignments to be cancelled/rollbacked
> > > > > > cleanly/safely/quickly.  This is the main goal of KIP-236.
> > > > > >
> > > > > > The original KIP-236 by Tom Bentley also proposed the incremental
> > > > > > reassignments, to submit new reassignments while the current
> > > > > > reassignments is still going on. This is scaled back to put under
> > > > > > "Planned Future Changes" section of KIP-236, so we can expedite
> > this
> > > > > > Reassignment Cancellation/Rollback feature out to the community.
> > > > > >
> > > > > > The main idea incremental reassignment is to allow submit new
> > > > > > reassignments in another ZK node /admin/reassign_partitions_queue
> > > and
> > > > > > merge it with current pending reassignments in
> > > > > > /admin/reassign_partitions.  In case of same topic/partition in
> > both
> > > > ZK
> > > > > > node, the conflict resolution is to cancel the current
> reassignment
> > > in
> > > > > > /admin/reassign_partitions, and move the same topic/partition
> > > > > > from /admin/reassign_partitions_queue  as new reassignment.
> > > > > >
> > > > > > If there is enough interest from the community, this "Planned
> > Future
> > > > > > Changes" for incremental reassignments can also be delivered in
> > > > > > KIP-236, otherwise, another KIP.  The current
> > > > > > PR:  https://github.com/apache/kafka/pull/6296  only
> > > > focuses/addresses
> > > > > > the pending Reassignment Cancellation/Rollback.
> > > > > >
> > > > > > Hope this answers your questions.
> > > > > >
> > > > > > Thanks,George
> > > > > >
> > > > > >    On Friday, February 22, 2019, 6:51:14 AM PST, Viktor
> > Somogyi-Vass
> > > > > > <vi...@gmail.com> wrote:
> > > > > >
> > > > > >  Read through the KIP and I have one comment:
> > > > > >
> > > > > > It seems like it is not looking strictly for cancellation but
> also
> > > > > > implements rolling back to the original. I think it'd be much
> > simpler
> > > > to
> > > > > > generate a reassignment json on cancellation that contains the
> > > original
> > > > > > assignment and start a new partition reassignment completely.
> This
> > > way
> > > > the
> > > > > > reassignment algorithm (whatever it is) could be reused as a
> whole.
> > > > Did you
> > > > > > consider this or are there any obstacles that prevents doing
> this?
> > > > > >
> > > > > > Regards,
> > > > > > Viktor
> > > > > >
> > > > > > On Fri, Feb 22, 2019 at 2:24 PM Viktor Somogyi-Vass <
> > > > viktorsomogyi@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > I've published the above mentioned KIP here:
> > > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-435%3A+Incremental+Partition+Reassignment
> > > > > > > Will start a discussion about it soon.
> > > > > > >
> > > > > > > On Fri, Feb 22, 2019 at 12:45 PM Viktor Somogyi-Vass <
> > > > > > > viktorsomogyi@gmail.com> wrote:
> > > > > > >
> > > > > > >> Hi Folks,
> > > > > > >>
> > > > > > >> I also have a pending active work on the incremental partition
> > > > > > >> reassignment stuff here:
> > > > https://issues.apache.org/jira/browse/KAFKA-6794
> > > > > > >> I think it would be good to cooperate on this to make both
> work
> > > > > > >> compatible with each other.
> > > > > > >>
> > > > > > >> I'll write up a KIP about this today so it'll be easier to see
> > how
> > > > to fit
> > > > > > >> the two together. Basically in my work I operate on the
> > > > > > >> /admin/reassign_partitions node on a fully compatible way,
> > meaning
> > > > I won't
> > > > > > >> change it just calculate each increment based on that and the
> > > > current state
> > > > > > >> of the ISR set for the partition in reassignment.
> > > > > > >> I hope we could collaborate on this.
> > > > > > >>
> > > > > > >> Viktor
> > > > > > >>
> > > > > > >> On Thu, Feb 21, 2019 at 9:04 PM Harsha <ka...@harsha.io>
> wrote:
> > > > > > >>
> > > > > > >>> Thanks George. LGTM.
> > > > > > >>> Jun & Tom, Can you please take a look at the updated KIP.
> > > > > > >>> Thanks,
> > > > > > >>> Harsha
> > > > > > >>>
> > > > > > >>> On Wed, Feb 20, 2019, at 12:18 PM, George Li wrote:
> > > > > > >>> > Hi,
> > > > > > >>> >
> > > > > > >>> > After discussing with Tom, Harsha and I are picking up
> > KIP-236
> > > <
> > > > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment
> > > > >.
> > > > > > >>> The work focused on safely/cleanly cancel / rollback pending
> > > > reassignments
> > > > > > >>> in a timely fashion. Pull Request #6296 <
> > > > > > >>> https://github.com/apache/kafka/pull/6296> Still working on
> > more
> > > > > > >>> integration/system tests.
> > > > > > >>> >
> > > > > > >>> > Please review and provide feedbacks/suggestions.
> > > > > > >>> >
> > > > > > >>> > Thanks,
> > > > > > >>> > George
> > > > > > >>> >
> > > > > > >>> >
> > > > > > >>> > On Saturday, December 23, 2017, 0:51:13 GMT, Jun Rao <
> > > > jun@confluent.io>
> > > > > > >>> wrote:
> > > > > > >>> >
> > > > > > >>> > Hi, Tom,
> > > > > > >>>
> > > > > > >>> Thanks for the reply.
> > > > > > >>>
> > > > > > >>> 10. That's a good thought. Perhaps it's better to get rid of
> > > > > > >>> /admin/reassignment_requests
> > > > > > >>> too. The window when a controller is not available is small.
> > So,
> > > > we can
> > > > > > >>> just failed the admin client if the controller is not
> reachable
> > > > after the
> > > > > > >>> timeout.
> > > > > > >>>
> > > > > > >>> 13. With the changes in 10, the old approach is handled
> through
> > > ZK
> > > > > > >>> callback
> > > > > > >>> and the new approach is through Kafka RP The ordering between
> > the
> > > > two
> > > > > > >>> is
> > > > > > >>> kind of arbitrary. Perhaps the ordering can just be based on
> > the
> > > > order
> > > > > > >>> that
> > > > > > >>> the reassignment is added to the controller request queue.
> From
> > > > there, we
> > > > > > >>> can either do the overriding or the prevention.
> > > > > > >>>
> > > > > > >>> Jun
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> On Fri, Dec 22, 2017 at 7:31 AM, Tom Bentley <
> > > > t.j.bentley@gmail.com>
> > > > > > >>> wrote:
> > > > > > >>>
> > > > > > >>> > Hi Jun,
> > > > > > >>> >
> > > > > > >>> > Thanks for responding, my replies are inline:
> > > > > > >>> >
> > > > > > >>> > 10. You explanation makes sense. My remaining concern is
> the
> > > > > > >>> additional ZK
> > > > > > >>> > > writes in the proposal. With the proposal, we will need
> to
> > do
> > > > > > >>> following
> > > > > > >>> > > writes in ZK.
> > > > > > >>> > >
> > > > > > >>> > > a. write new assignment in /admin/reassignment_requests
> > > > > > >>> > >
> > > > > > >>> > > b. write new assignment and additional metadata in
> > > > > > >>> > > /admin/reassignments/$topic/$partition
> > > > > > >>> > >
> > > > > > >>> > > c. write old + new assignment  in /brokers/topics/[topic]
> > > > > > >>> > >
> > > > > > >>> > > d. write new assignment in /brokers/topics/[topic]
> > > > > > >>> > >
> > > > > > >>> > > e. delete /admin/reassignments/$topic/$partition
> > > > > > >>> > >
> > > > > > >>> > > So, there are quite a few ZK writes. I am wondering if
> it's
> > > > better to
> > > > > > >>> > > consolidate the info in
> > > /admin/reassignments/$topic/$partition
> > > > into
> > > > > > >>> > > /brokers/topics/[topic].
> > > > > > >>> > > For example, we can just add some new JSON fields in
> > > > > > >>> > > /brokers/topics/[topic]
> > > > > > >>> > > to remember the new assignment and potentially the
> original
> > > > replica
> > > > > > >>> count
> > > > > > >>> > > when doing step c. Those fields with then be removed in
> > step
> > > > d. That
> > > > > > >>> way,
> > > > > > >>> > > we can get rid of step b and e, saving 2 ZK writes per
> > > > partition.
> > > > > > >>> > >
> > > > > > >>> >
> > > > > > >>> > This seems like a great idea to me.
> > > > > > >>> >
> > > > > > >>> > It might also be possible to get rid of the
> > > > > > >>> /admin/reassignment_requests
> > > > > > >>> > subtree too. I've not yet published the ideas I have for
> the
> > > > > > >>> AdminClient
> > > > > > >>> > API for reassigning partitions, but given the existence of
> > such
> > > > an
> > > > > > >>> API, the
> > > > > > >>> > route to starting a reassignment would be the AdminClient,
> > and
> > > > not
> > > > > > >>> > zookeeper. In that case there is no need for
> > > > > > >>> /admin/reassignment_requests
> > > > > > >>> > at all. The only drawback that I can see is that while it's
> > > > currently
> > > > > > >>> > possible to trigger a reassignment even during a controller
> > > > > > >>> > election/failover that would no longer be the case if all
> > > > requests had
> > > > > > >>> to
> > > > > > >>> > go via the controller.
> > > > > > >>> >
> > > > > > >>> >
> > > > > > >>> > > 11. What you described sounds good. We could potentially
> > > > optimize the
> > > > > > >>> > > dropped replicas a bit more. Suppose that assignment
> > [0,1,2]
> > > > is first
> > > > > > >>> > > changed to [1,2,3] and then to [2,3,4]. When initiating
> the
> > > > second
> > > > > > >>> > > assignment, we may end up dropping replica 3 and only to
> > > > restart it
> > > > > > >>> > again.
> > > > > > >>> > > In this case, we could only drop a replica if it's not
> > going
> > > > to be
> > > > > > >>> added
> > > > > > >>> > > back again.
> > > > > > >>> > >
> > > > > > >>> >
> > > > > > >>> > I had missed that, thank you! I will update the proposed
> > > > algorithm to
> > > > > > >>> > prevent this.
> > > > > > >>> >
> > > > > > >>> >
> > > > > > >>> > > 13. Since this is a corner case, we can either prevent or
> > > allow
> > > > > > >>> > overriding
> > > > > > >>> > > with old/new mechanisms. To me, it seems that allowing is
> > > > simpler to
> > > > > > >>> > > implement, the order in /admin/reassignment_requests
> > > > determines the
> > > > > > >>> > > ordering the of override, whether that's initiated by the
> > new
> > > > way or
> > > > > > >>> the
> > > > > > >>> > > old way.
> > > > > > >>> > >
> > > > > > >>> >
> > > > > > >>> > That makes sense except for the corner case where:
> > > > > > >>> >
> > > > > > >>> > * There is no current controller and
> > > > > > >>> > * Writes to both the new and old znodes happen
> > > > > > >>> >
> > > > > > >>> > On election of the new controller, for those partitions
> with
> > > > both a
> > > > > > >>> > reassignment_request and in /admin/reassign_partitions, we
> > have
> > > > to
> > > > > > >>> decide
> > > > > > >>> > which should win. You could use the modification time,
> though
> > > > there are
> > > > > > >>> > some very unlikely scenarios where that doesn't work
> > properly,
> > > > for
> > > > > > >>> example
> > > > > > >>> > if both znodes have the same mtime, or the
> > > > /admin/reassign_partitions
> > > > > > >>> was
> > > > > > >>> > updated, but the assignment of the partition wasn't
> changed,
> > > > like this:
> > > > > > >>> >
> > > > > > >>> > 0. /admin/reassign_partitions has my-topic/42 = [1,2,3]
> > > > > > >>> > 1. Controller stops watching.
> > > > > > >>> > 2. Create /admin/reassignment_requests/request_1234 to
> change
> > > the
> > > > > > >>> > reassignment of partition my-topic/42 = [4,5,6]
> > > > > > >>> > 3. Update /admin/reassign_partitions to add
> > > your-topic/12=[7,8,9]
> > > > > > >>> > 4. New controller resumes
> > > > > > >>> >
> > > > > > >>> >
> > > > > > >>> >
> > > > > > >>> > > Thanks,
> > > > > > >>> > >
> > > > > > >>> > > Jun
> > > > > > >>> > >
> > > > > > >>> > > On Tue, Dec 19, 2017 at 2:43 AM, Tom Bentley <
> > > > t.j.bentley@gmail.com>
> > > > > > >>> > > wrote:
> > > > > > >>> > >
> > > > > > >>> > > > Hi Jun,
> > > > > > >>> > > >
> > > > > > >>> > > > 10. Another concern of mine is on consistency with the
> > > > current
> > > > > > >>> pattern.
> > > > > > >>> > > The
> > > > > > >>> > > > > current pattern for change notification based on ZK
> is
> > > (1)
> > > > we
> > > > > > >>> first
> > > > > > >>> > > write
> > > > > > >>> > > > > the actual value in the entity path and then write
> the
> > > > change
> > > > > > >>> > > > notification
> > > > > > >>> > > > > path, and (2)  the change notification path only
> > includes
> > > > what
> > > > > > >>> entity
> > > > > > >>> > > has
> > > > > > >>> > > > > changed but not the actual changes. If we want to
> > follow
> > > > this
> > > > > > >>> pattern
> > > > > > >>> > > for
> > > > > > >>> > > > > consistency, /admin/reassignment_requests/request_xxx
> > > will
> > > > only
> > > > > > >>> have
> > > > > > >>> > > the
> > > > > > >>> > > > > partitions whose reassignment have changed, but not
> the
> > > > actual
> > > > > > >>> > > > > reassignment.
> > > > > > >>> > > > >
> > > > > > >>> > > >
> > > > > > >>> > > > Ah, I hadn't understood part (2). That means my concern
> > > about
> > > > > > >>> > efficiency
> > > > > > >>> > > > with the current pattern is misplaced. There are still
> > some
> > > > > > >>> interesting
> > > > > > >>> > > > differences in semantics, however:
> > > > > > >>> > > >
> > > > > > >>> > > > a) The mechanism currently proposed in KIP-236 means
> that
> > > the
> > > > > > >>> > controller
> > > > > > >>> > > is
> > > > > > >>> > > > the only writer to /admin/reassignments. This means it
> > can
> > > > include
> > > > > > >>> > > > information in these znodes that requesters might not
> > know,
> > > > or
> > > > > > >>> > > information
> > > > > > >>> > > > that's necessary to perform the reassignment but not
> > > > necessary to
> > > > > > >>> > > describe
> > > > > > >>> > > > the request. While this could be handled using the
> > current
> > > > pattern
> > > > > > >>> it
> > > > > > >>> > > would
> > > > > > >>> > > > rely on all  writers to preserve any information added
> by
> > > the
> > > > > > >>> > controller,
> > > > > > >>> > > > which seems complicated and hence fragile.
> > > > > > >>> > > >
> > > > > > >>> > > > b) The current pattern for change notification doesn't
> > cope
> > > > with
> > > > > > >>> > > competing
> > > > > > >>> > > > writers to the entity path: If two processes write to
> the
> > > > entity
> > > > > > >>> path
> > > > > > >>> > > > before the controller can read it (due to notification)
> > > then
> > > > one
> > > > > > >>> set of
> > > > > > >>> > > > updates will be lost.
> > > > > > >>> > > >
> > > > > > >>> > > > c) If a single writing process crashes after writing to
> > the
> > > > entity
> > > > > > >>> > path,
> > > > > > >>> > > > but before writing to the notification path then the
> > write
> > > > will be
> > > > > > >>> > lost.
> > > > > > >>> > > >
> > > > > > >>> > > > I'm actually using point a) in my WIP (see below).
> Points
> > > b)
> > > > and
> > > > > > >>> c) are
> > > > > > >>> > > > obviously edge cases.
> > > > > > >>> > > >
> > > > > > >>> > > >
> > > > > > >>> > > > > 11. Ok. I am not sure that I fully understand the
> > > > description of
> > > > > > >>> that
> > > > > > >>> > > > part.
> > > > > > >>> > > > > Does "assigned" refer to the current assignment?
> Could
> > > you
> > > > also
> > > > > > >>> > > describe
> > > > > > >>> > > > > where the length of the original assignment is stored
> > in
> > > > ZK?
> > > > > > >>> > > > >
> > > > > > >>> > > >
> > > > > > >>> > > > Sorry if the description is not clear. Yes, "assigned"
> > > > referrs to
> > > > > > >>> the
> > > > > > >>> > > > currently assigned replicas (taken from the
> > > > > > >>> > > > ControllerContext.partitionReplicaAssignment). I would
> > > store
> > > > the
> > > > > > >>> > length
> > > > > > >>> > > of
> > > > > > >>> > > > the original assignment in the
> > > > > > >>> /admin/reassignments/$topic/$partition
> > > > > > >>> > > > znode
> > > > > > >>> > > > (this is where the point (a) above is useful -- the
> > > requester
> > > > > > >>> shouldn't
> > > > > > >>> > > > know that this information is used by the controller).
> > > > > > >>> > > >
> > > > > > >>> > > > I've updated the KIP to make these points clearer.
> > > > > > >>> > > >
> > > > > > >>> > > >
> > > > > > >>> > > > > 13. Hmm, I am not sure that the cancellation needs to
> > be
> > > > done
> > > > > > >>> for the
> > > > > > >>> > > > whole
> > > > > > >>> > > > > batch. The reason that I brought this up is for
> > > > consistency. The
> > > > > > >>> KIP
> > > > > > >>> > > > allows
> > > > > > >>> > > > > override when using the new approach. It just seems
> > that
> > > > it's
> > > > > > >>> simpler
> > > > > > >>> > > to
> > > > > > >>> > > > > extend this model when resolving multiple changes
> > between
> > > > the
> > > > > > >>> old and
> > > > > > >>> > > the
> > > > > > >>> > > > > new approach.
> > > > > > >>> > > >
> > > > > > >>> > > >
> > > > > > >>> > > > Ah, I think I've been unclear on this point too.
> > Currently
> > > > the
> > > > > > >>> > > > ReassignPartitionsCommand enforces that you can't
> change
> > > > > > >>> reassignments,
> > > > > > >>> > > but
> > > > > > >>> > > > this doesn't stop other ZK clients making changes to
> > > > > > >>> > > > /admin/reassign_partitions directly and I believe some
> > > Kafka
> > > > users
> > > > > > >>> do
> > > > > > >>> > > > indeed change reassignments in-flight by writing to
> > > > > > >>> > > > /admin/reassign_partitions. What I'm proposing doesn't
> > > break
> > > > that
> > > > > > >>> at
> > > > > > >>> > all.
> > > > > > >>> > > > The semantic I've implemented is only that the
> controller
> > > > only
> > > > > > >>> refuses
> > > > > > >>> > a
> > > > > > >>> > > > reassignment change if there is already one in-flight
> > (i.e.
> > > > in
> > > > > > >>> > > > /admin/reassignments/$topic/$partition) **via the other
> > > > > > >>> mechansim**.
> > > > > > >>> > So
> > > > > > >>> > > if
> > > > > > >>> > > > you're using /admin/reassign_partitions and you change
> or
> > > > cancel
> > > > > > >>> part
> > > > > > >>> > of
> > > > > > >>> > > it
> > > > > > >>> > > > via /admin/reassign_partitions, that's OK. Likewise if
> > > > you're using
> > > > > > >>> > > > /admin/reassignment_request/request_xxx and you change
> or
> > > > cancel
> > > > > > >>> part
> > > > > > >>> > of
> > > > > > >>> > > > it
> > > > > > >>> > > > via another /admin/reassignment_request/request_xxx,
> > that's
> > > > OK.What
> > > > > > >>> > you
> > > > > > >>> > > > can't do is change a request that was started via
> > > > > > >>> > > > /admin/reassign_partitions via
> > /admin/reassignment_request/
> > > > > > >>> > request_xxx,
> > > > > > >>> > > or
> > > > > > >>> > > > vice versa.
> > > > > > >>> > > >
> > > > > > >>> > > > What I was thinking of when I replied is the case
> where,
> > on
> > > > > > >>> controller
> > > > > > >>> > > > failover, /admin/reassign_partitions has been changed
> and
> > > > > > >>> > > > /admin/reassignment_request/request_xxx created (in the
> > > > period when
> > > > > > >>> > the
> > > > > > >>> > > > new
> > > > > > >>> > > > controller was being elected, for example) with a
> common
> > > > > > >>> partition. In
> > > > > > >>> > > this
> > > > > > >>> > > > case we should apply a consistent rule to that used
> when
> > > the
> > > > > > >>> > notification
> > > > > > >>> > > > happen in real time. Your suggestion to use the
> > > modification
> > > > time
> > > > > > >>> of
> > > > > > >>> > the
> > > > > > >>> > > > znode would work here too (except in the edge case
> where
> > ZK
> > > > writes
> > > > > > >>> to
> > > > > > >>> > > both
> > > > > > >>> > > > znodes happen within the same clock tick on the ZK
> > server,
> > > > so the
> > > > > > >>> > mtimes
> > > > > > >>> > > > are the same).
> > > > > > >>> > > >
> > > > > > >>> > > > Let me know if you think this is the right semantic and
> > > I'll
> > > > try to
> > > > > > >>> > > clarify
> > > > > > >>> > > > the KIP.
> > > > > > >>> > > >
> > > > > > >>> > > > Many thanks,
> > > > > > >>> > > >
> > > > > > >>> > > > Tom
> > > > > > >>> > > >
> > > > > > >>> > > > On 18 December 2017 at 18:12, Jun Rao <
> jun@confluent.io>
> > > > wrote:
> > > > > > >>> > > >
> > > > > > >>> > > > > Hi, Tom,
> > > > > > >>> > > > >
> > > > > > >>> > > > > Thanks for the reply. A few more followup comments
> > below.
> > > > > > >>> > > > >
> > > > > > >>> > > > > 10. Another concern of mine is on consistency with
> the
> > > > current
> > > > > > >>> > pattern.
> > > > > > >>> > > > The
> > > > > > >>> > > > > current pattern for change notification based on ZK
> is
> > > (1)
> > > > we
> > > > > > >>> first
> > > > > > >>> > > write
> > > > > > >>> > > > > the actual value in the entity path and then write
> the
> > > > change
> > > > > > >>> > > > notification
> > > > > > >>> > > > > path, and (2)  the change notification path only
> > includes
> > > > what
> > > > > > >>> entity
> > > > > > >>> > > has
> > > > > > >>> > > > > changed but not the actual changes. If we want to
> > follow
> > > > this
> > > > > > >>> pattern
> > > > > > >>> > > for
> > > > > > >>> > > > > consistency, /admin/reassignment_requests/request_xxx
> > > will
> > > > only
> > > > > > >>> have
> > > > > > >>> > > the
> > > > > > >>> > > > > partitions whose reassignment have changed, but not
> the
> > > > actual
> > > > > > >>> > > > > reassignment.
> > > > > > >>> > > > >
> > > > > > >>> > > > > 11. Ok. I am not sure that I fully understand the
> > > > description of
> > > > > > >>> that
> > > > > > >>> > > > part.
> > > > > > >>> > > > > Does "assigned" refer to the current assignment?
> Could
> > > you
> > > > also
> > > > > > >>> > > describe
> > > > > > >>> > > > > where the length of the original assignment is stored
> > in
> > > > ZK?
> > > > > > >>> > > > >
> > > > > > >>> > > > > 13. Hmm, I am not sure that the cancellation needs to
> > be
> > > > done
> > > > > > >>> for the
> > > > > > >>> > > > whole
> > > > > > >>> > > > > batch. The reason that I brought this up is for
> > > > consistency. The
> > > > > > >>> KIP
> > > > > > >>> > > > allows
> > > > > > >>> > > > > override when using the new approach. It just seems
> > that
> > > > it's
> > > > > > >>> simpler
> > > > > > >>> > > to
> > > > > > >>> > > > > extend this model when resolving multiple changes
> > between
> > > > the
> > > > > > >>> old and
> > > > > > >>> > > the
> > > > > > >>> > > > > new approach.
> > > > > > >>> > > > >
> > > > > > >>> > > > > Jun
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > > On Mon, Dec 18, 2017 at 2:45 AM, Tom Bentley <
> > > > > > >>> t.j.bentley@gmail.com>
> > > > > > >>> > > > > wrote:
> > > > > > >>> > > > >
> > > > > > >>> > > > > > Hi Jun,
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > Thanks for replying, some answers below:
> > > > > > >>> > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > > 10. The proposal now stores the reassignment for
> > all
> > > > > > >>> partitions
> > > > > > >>> > in
> > > > > > >>> > > > > > > /admin/reassignment_requests/request_xxx. If the
> > > > number of
> > > > > > >>> > > > reassigned
> > > > > > >>> > > > > > > partitions is larger, the ZK write may hit the
> > > default
> > > > 1MB
> > > > > > >>> limit
> > > > > > >>> > > and
> > > > > > >>> > > > > > fail.
> > > > > > >>> > > > > > > An alternative approach is to have the
> reassignment
> > > > requester
> > > > > > >>> > first
> > > > > > >>> > > > > write
> > > > > > >>> > > > > > > the new assignment for each partition under
> > > > > > >>> > > > > > > /admin/reassignments/$topic/$partition and then
> > write
> > > > > > >>> > > > > > > /admin/reassignment_requests/request_xxx with an
> > > empty
> > > > value.
> > > > > > >>> > The
> > > > > > >>> > > > > > > controller can then read all values under
> > > > > > >>> /admin/reassignments.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > You're right that reassigning enough partitions
> would
> > > > hit the
> > > > > > >>> 1MB
> > > > > > >>> > > > limit,
> > > > > > >>> > > > > > but I don't think this would be a problem in
> practice
> > > > because
> > > > > > >>> it
> > > > > > >>> > > would
> > > > > > >>> > > > be
> > > > > > >>> > > > > > trivial to split the partitions into several
> requests
> > > > (i.e.
> > > > > > >>> > mutleiple
> > > > > > >>> > > > > > request_xxx).
> > > > > > >>> > > > > > I don't think the non-atomicity this would imply
> is a
> > > > problem.
> > > > > > >>> By
> > > > > > >>> > > > writing
> > > > > > >>> > > > > > the partitions whose
> > > > /admin/reassignments/$topic/$partition has
> > > > > > >>> > been
> > > > > > >>> > > > > > created or changed it makes it much more efficient
> to
> > > > know
> > > > > > >>> which of
> > > > > > >>> > > > those
> > > > > > >>> > > > > > znodes we need to read. If I understand your
> > > suggestion,
> > > > you
> > > > > > >>> would
> > > > > > >>> > > have
> > > > > > >>> > > > > to
> > > > > > >>> > > > > > read every node under /admin/reassignments to
> figure
> > > out
> > > > which
> > > > > > >>> had
> > > > > > >>> > > > > changed.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > > 11. The improvement you suggested in
> > > > > > >>> onPartitionReassignment()
> > > > > > >>> > > sounds
> > > > > > >>> > > > > > good
> > > > > > >>> > > > > > > at the high level. The computation of those
> dropped
> > > > > > >>> partitions
> > > > > > >>> > > seems
> > > > > > >>> > > > a
> > > > > > >>> > > > > > bit
> > > > > > >>> > > > > > > complicated. Perhaps a simple approach is to drop
> > the
> > > > > > >>> replicas
> > > > > > >>> > not
> > > > > > >>> > > in
> > > > > > >>> > > > > the
> > > > > > >>> > > > > > > original assignment and newest reassignment?
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > This was what I came up with originally too, but
> > when I
> > > > looked
> > > > > > >>> into
> > > > > > >>> > > > > > implementing it I found a couple of things which
> made
> > > me
> > > > > > >>> reconsider
> > > > > > >>> > > it.
> > > > > > >>> > > > > > Consider the reassignments [0,1] -> [2,3] -> [3,4].
> > In
> > > > words:
> > > > > > >>> we
> > > > > > >>> > > start
> > > > > > >>> > > > > > reassigning to [2,3], but then change our minds
> > about 2
> > > > and
> > > > > > >>> switch
> > > > > > >>> > it
> > > > > > >>> > > > to
> > > > > > >>> > > > > 4
> > > > > > >>> > > > > > (maybe we've figured out a better overall balance).
> > At
> > > > that
> > > > > > >>> point
> > > > > > >>> > it
> > > > > > >>> > > is
> > > > > > >>> > > > > > perfectly possible that broker 2 is in-sync and
> > broker
> > > 1
> > > > is not
> > > > > > >>> > > > in-sync.
> > > > > > >>> > > > > It
> > > > > > >>> > > > > > seems silly to drop broker 2 in favour of broker 1:
> > > We're
> > > > > > >>> > needlessly
> > > > > > >>> > > > > giving
> > > > > > >>> > > > > > the cluster more work to do.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > The second thing that made me reconsider was in
> that
> > > same
> > > > > > >>> scenario
> > > > > > >>> > > it's
> > > > > > >>> > > > > > even possible that broker 2 is the leader of the
> > > > partition.
> > > > > > >>> > Obviously
> > > > > > >>> > > > we
> > > > > > >>> > > > > > can elect a new leader before dropping it, but not
> > > > without
> > > > > > >>> causing
> > > > > > >>> > > > > > disruption to producers and consumers.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > By accepting a little more complexity in choosing
> > which
> > > > > > >>> brokers to
> > > > > > >>> > > drop
> > > > > > >>> > > > > we
> > > > > > >>> > > > > > make the dropping simpler (no need for leader
> > election)
> > > > and
> > > > > > >>> ensure
> > > > > > >>> > > the
> > > > > > >>> > > > > > cluster has less work to do.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > > 12. You brought up the need of remembering the
> > > original
> > > > > > >>> > assignment.
> > > > > > >>> > > > > This
> > > > > > >>> > > > > > > will be lost if the assignment is changed
> multiple
> > > > times if
> > > > > > >>> we
> > > > > > >>> > > follow
> > > > > > >>> > > > > the
> > > > > > >>> > > > > > > approach described in 10. One way is to store the
> > > > original
> > > > > > >>> > > assignment
> > > > > > >>> > > > > in
> > > > > > >>> > > > > > > /brokers/topics/[topic] as the following. When
> the
> > > > final
> > > > > > >>> > > reassignment
> > > > > > >>> > > > > > > completes, we can remove the original field.
> > > > > > >>> > > > > > > {
> > > > > > >>> > > > > > >  "version": 1,
> > > > > > >>> > > > > > >  "partitions": {"0": [0, 1, 3] },
> > > > > > >>> > > > > > >  "originals": {"0": [0, 1, 2] }
> > > > > > >>> > > > > > > }
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > While I was implementing my first version of
> > > > > > >>> > > onPartitionReassignment(),
> > > > > > >>> > > > > > where I preferred the originals, I was storing the
> > > > originals
> > > > > > >>> in the
> > > > > > >>> > > > > > /admin/reassignments/$topic/$partition znodes.
> Since
> > we
> > > > will
> > > > > > >>> > remove
> > > > > > >>> > > > that
> > > > > > >>> > > > > > znode at the end of reassignment anyway, I would
> > > suggest
> > > > this
> > > > > > >>> is a
> > > > > > >>> > > > better
> > > > > > >>> > > > > > place to store that data (if it's necessary to do
> > so),
> > > > so that
> > > > > > >>> we
> > > > > > >>> > can
> > > > > > >>> > > > > avoid
> > > > > > >>> > > > > > another ZK round trip.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > > 13. For resolving the conflict between
> > > > > > >>> /admin/reassign_partitions
> > > > > > >>> > > and
> > > > > > >>> > > > > > > /admin/reassignments/$topic/$partition, perhaps
> > it's
> > > > more
> > > > > > >>> > natural
> > > > > > >>> > > to
> > > > > > >>> > > > > > just
> > > > > > >>> > > > > > > let the assignment with a newer timestamp to
> > override
> > > > the
> > > > > > >>> older
> > > > > > >>> > > one?
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > That would work but with slightly different
> semantics
> > > to
> > > > what I
> > > > > > >>> > have:
> > > > > > >>> > > > > Since
> > > > > > >>> > > > > > /admin/reassign_partitions contains multiple
> > > partitions,
> > > > using
> > > > > > >>> the
> > > > > > >>> > > > > > timestamp means the whole batch wins or losses. By
> > > > tracking how
> > > > > > >>> > each
> > > > > > >>> > > > > > request was made we can be more fine-grained. I'm
> to
> > > use
> > > > the
> > > > > > >>> > > > modification
> > > > > > >>> > > > > > time if such granularity is not required.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > > 14. Implementation wise, currently, we register a
> > > > watcher of
> > > > > > >>> the
> > > > > > >>> > > isr
> > > > > > >>> > > > > path
> > > > > > >>> > > > > > > of each partition being reassigned. This has the
> > > > potential
> > > > > > >>> issue
> > > > > > >>> > of
> > > > > > >>> > > > > > > registering many listeners. An improvement could
> be
> > > > just
> > > > > > >>> > > piggybacking
> > > > > > >>> > > > > on
> > > > > > >>> > > > > > > the existing IsrChangeNotificationHandler, which
> > only
> > > > > > >>> watches a
> > > > > > >>> > > > single
> > > > > > >>> > > > > ZK
> > > > > > >>> > > > > > > path and is triggered on a batch of isr changes.
> > This
> > > > is
> > > > > > >>> kind of
> > > > > > >>> > > > > > orthogonal
> > > > > > >>> > > > > > > to the KIP. However, if we are touching the
> > > > reassignment
> > > > > > >>> logic,
> > > > > > >>> > it
> > > > > > >>> > > > may
> > > > > > >>> > > > > be
> > > > > > >>> > > > > > > worth considering.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > Let me look into that.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > Thanks,
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > Tom
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > On 16 December 2017 at 02:19, Jun Rao <
> > > jun@confluent.io>
> > > > > > >>> wrote:
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > > Hi, Tom,
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > Thanks for the updated KIP. A few more comments
> > > below.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > 10. The proposal now stores the reassignment for
> > all
> > > > > > >>> partitions
> > > > > > >>> > in
> > > > > > >>> > > > > > > /admin/reassignment_requests/request_xxx. If the
> > > > number of
> > > > > > >>> > > > reassigned
> > > > > > >>> > > > > > > partitions is larger, the ZK write may hit the
> > > default
> > > > 1MB
> > > > > > >>> limit
> > > > > > >>> > > and
> > > > > > >>> > > > > > fail.
> > > > > > >>> > > > > > > An alternative approach is to have the
> reassignment
> > > > requester
> > > > > > >>> > first
> > > > > > >>> > > > > write
> > > > > > >>> > > > > > > the new assignment for each partition under
> > > > > > >>> > > > > > > /admin/reassignments/$topic/$partition and then
> > write
> > > > > > >>> > > > > > > /admin/reassignment_requests/request_xxx with an
> > > empty
> > > > value.
> > > > > > >>> > The
> > > > > > >>> > > > > > > controller can then read all values under
> > > > > > >>> /admin/reassignments.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > 11. The improvement you suggested in
> > > > > > >>> onPartitionReassignment()
> > > > > > >>> > > sounds
> > > > > > >>> > > > > > good
> > > > > > >>> > > > > > > at the high level. The computation of those
> dropped
> > > > > > >>> partitions
> > > > > > >>> > > seems
> > > > > > >>> > > > a
> > > > > > >>> > > > > > bit
> > > > > > >>> > > > > > > complicated. Perhaps a simple approach is to drop
> > the
> > > > > > >>> replicas
> > > > > > >>> > not
> > > > > > >>> > > in
> > > > > > >>> > > > > the
> > > > > > >>> > > > > > > original assignment and newest reassignment?
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > 12. You brought up the need of remembering the
> > > original
> > > > > > >>> > assignment.
> > > > > > >>> > > > > This
> > > > > > >>> > > > > > > will be lost if the assignment is changed
> multiple
> > > > times if
> > > > > > >>> we
> > > > > > >>> > > follow
> > > > > > >>> > > > > the
> > > > > > >>> > > > > > > approach described in 10. One way is to store the
> > > > original
> > > > > > >>> > > assignment
> > > > > > >>> > > > > in
> > > > > > >>> > > > > > > /brokers/topics/[topic] as the following. When
> the
> > > > final
> > > > > > >>> > > reassignment
> > > > > > >>> > > > > > > completes, we can remove the original field.
> > > > > > >>> > > > > > > {
> > > > > > >>> > > > > > >  "version": 1,
> > > > > > >>> > > > > > >  "partitions": {"0": [0, 1, 3] },
> > > > > > >>> > > > > > >  "originals": {"0": [0, 1, 2] }
> > > > > > >>> > > > > > > }
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > 13. For resolving the conflict between
> > > > > > >>> /admin/reassign_partitions
> > > > > > >>> > > and
> > > > > > >>> > > > > > > /admin/reassignments/$topic/$partition, perhaps
> > it's
> > > > more
> > > > > > >>> > natural
> > > > > > >>> > > to
> > > > > > >>> > > > > > just
> > > > > > >>> > > > > > > let the assignment with a newer timestamp to
> > override
> > > > the
> > > > > > >>> older
> > > > > > >>> > > one?
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > 14. Implementation wise, currently, we register a
> > > > watcher of
> > > > > > >>> the
> > > > > > >>> > > isr
> > > > > > >>> > > > > path
> > > > > > >>> > > > > > > of each partition being reassigned. This has the
> > > > potential
> > > > > > >>> issue
> > > > > > >>> > of
> > > > > > >>> > > > > > > registering many listeners. An improvement could
> be
> > > > just
> > > > > > >>> > > piggybacking
> > > > > > >>> > > > > on
> > > > > > >>> > > > > > > the existing IsrChangeNotificationHandler, which
> > only
> > > > > > >>> watches a
> > > > > > >>> > > > single
> > > > > > >>> > > > > ZK
> > > > > > >>> > > > > > > path and is triggered on a batch of isr changes.
> > This
> > > > is
> > > > > > >>> kind of
> > > > > > >>> > > > > > orthogonal
> > > > > > >>> > > > > > > to the KIP. However, if we are touching the
> > > > reassignment
> > > > > > >>> logic,
> > > > > > >>> > it
> > > > > > >>> > > > may
> > > > > > >>> > > > > be
> > > > > > >>> > > > > > > worth considering.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > Thanks,
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > Jun
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > On Fri, Dec 15, 2017 at 10:17 AM, Tom Bentley <
> > > > > > >>> > > t.j.bentley@gmail.com
> > > > > > >>> > > > >
> > > > > > >>> > > > > > > wrote:
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > > Just wanted to mention that I've started
> KIP-240,
> > > > which
> > > > > > >>> builds
> > > > > > >>> > on
> > > > > > >>> > > > top
> > > > > > >>> > > > > > of
> > > > > > >>> > > > > > > > this one to provide an AdminClient API for
> > listing
> > > > and
> > > > > > >>> > describing
> > > > > > >>> > > > > > > > reassignments.
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > On 15 December 2017 at 14:34, Tom Bentley <
> > > > > > >>> > t.j.bentley@gmail.com
> > > > > > >>> > > >
> > > > > > >>> > > > > > wrote:
> > > > > > >>> > > > > > > >
> > > > > > >>> > > > > > > > > > Should we seek to improve this algorithm in
> > > this
> > > > KIP,
> > > > > > >>> or
> > > > > > >>> > > leave
> > > > > > >>> > > > > that
> > > > > > >>> > > > > > > as
> > > > > > >>> > > > > > > > > a later optimisation?
> > > > > > >>> > > > > > > > >
> > > > > > >>> > > > > > > > > I've updated the KIP with a proposed
> algorithm.
> > > > > > >>> > > > > > > > >
> > > > > > >>> > > > > > > > >
> > > > > > >>> > > > > > > > >
> > > > > > >>> > > > > > > > >
> > > > > > >>> > > > > > > > >
> > > > > > >>> > > > > > > > >
> > > > > > >>> > > > > > > > > On 14 December 2017 at 09:57, Tom Bentley <
> > > > > > >>> > > t.j.bentley@gmail.com
> > > > > > >>> > > > >
> > > > > > >>> > > > > > > wrote:
> > > > > > >>> > > > > > > > >
> > > > > > >>> > > > > > > > >> Thanks Ted, now fixed.
> > > > > > >>> > > > > > > > >>
> > > > > > >>> > > > > > > > >> On 13 December 2017 at 18:38, Ted Yu <
> > > > > > >>> yuzhihong@gmail.com>
> > > > > > >>> > > > wrote:
> > > > > > >>> > > > > > > > >>
> > > > > > >>> > > > > > > > >>> Tom:
> > > > > > >>> > > > > > > > >>> bq. create a znode
> > > > > > >>> /admin/reassignments/$topic-$partition
> > > > > > >>> > > > > > > > >>>
> > > > > > >>> > > > > > > > >>> Looks like the tree structure above should
> > be:
> > > > > > >>> > > > > > > > >>>
> > > > > > >>> > > > > > > > >>> /admin/reassignments/$topic/$partition
> > > > > > >>> > > > > > > > >>>
> > > > > > >>> > > > > > > > >>> bq. The controller removes
> > > > /admin/reassignment/$topic/$
> > > > > > >>> > > > partition
> > > > > > >>> > > > > > > > >>>
> > > > > > >>> > > > > > > > >>> Note the lack of 's' for reassignment. It
> > would
> > > > be
> > > > > > >>> good to
> > > > > > >>> > > make
> > > > > > >>> > > > > > > > zookeeper
> > > > > > >>> > > > > > > > >>> paths consistent.
> > > > > > >>> > > > > > > > >>>
> > > > > > >>> > > > > > > > >>> Thanks
> > > > > > >>> > > > > > > > >>>
> > > > > > >>> > > > > > > > >>> On Wed, Dec 13, 2017 at 9:49 AM, Tom
> Bentley
> > <
> > > > > > >>> > > > > > t.j.bentley@gmail.com>
> > > > > > >>> > > > > > > > >>> wrote:
> > > > > > >>> > > > > > > > >>>
> > > > > > >>> > > > > > > > >>> > Hi Jun and Ted,
> > > > > > >>> > > > > > > > >>> >
> > > > > > >>> > > > > > > > >>> > Jun, you're right that needing one
> watcher
> > > per
> > > > > > >>> reassigned
> > > > > > >>> > > > > > partition
> > > > > > >>> > > > > > > > >>> > presents a scalability problem, and
> using a
> > > > separate
> > > > > > >>> > > > > notification
> > > > > > >>> > > > > > > > path
> > > > > > >>> > > > > > > > >>> > solves that. I also agree that it makes
> > sense
> > > > to
> > > > > > >>> prevent
> > > > > > >>> > > > users
> > > > > > >>> > > > > > from
> > > > > > >>> > > > > > > > >>> using
> > > > > > >>> > > > > > > > >>> > both methods on the same reassignment.
> > > > > > >>> > > > > > > > >>> >
> > > > > > >>> > > > > > > > >>> > Ted, naming the reassignments like
> > mytopic-42
> > > > was
> > > > > > >>> simpler
> > > > > > >>> > > > > while I
> > > > > > >>> > > > > > > was
> > > > > > >>> > > > > > > > >>> > proposing a watcher-per-reassignment (I'd
> > > have
> > > > > > >>> needed a
> > > > > > >>> > > child
> > > > > > >>> > > > > > > watcher
> > > > > > >>> > > > > > > > >>> on
> > > > > > >>> > > > > > > > >>> > /admin/reassignments and also on
> > > > > > >>> > > > /admin/reassignments/mytopic).
> > > > > > >>> > > > > > > Using
> > > > > > >>> > > > > > > > >>> the
> > > > > > >>> > > > > > > > >>> > separate notification path means I don't
> > need
> > > > any
> > > > > > >>> > watchers
> > > > > > >>> > > in
> > > > > > >>> > > > > the
> > > > > > >>> > > > > > > > >>> > /admin/reassignments subtree, so
> switching
> > to
> > > > > > >>> > > > > > > > >>> /admin/reassignments/mytopic/
> > > > > > >>> > > > > > > > >>> > 42
> > > > > > >>> > > > > > > > >>> > would work, and avoid
> /admin/reassignments
> > > > having a
> > > > > > >>> very
> > > > > > >>> > > > large
> > > > > > >>> > > > > > > number
> > > > > > >>> > > > > > > > >>> of
> > > > > > >>> > > > > > > > >>> > child nodes. On the other hand it also
> > means
> > > I
> > > > have
> > > > > > >>> to
> > > > > > >>> > > create
> > > > > > >>> > > > > and
> > > > > > >>> > > > > > > > >>> delete
> > > > > > >>> > > > > > > > >>> > the topic nodes (e.g.
> > > > /admin/reassignments/mytopic),
> > > > > > >>> > which
> > > > > > >>> > > > > incurs
> > > > > > >>> > > > > > > the
> > > > > > >>> > > > > > > > >>> cost
> > > > > > >>> > > > > > > > >>> > of extra round trips to zookeeper. I
> > suppose
> > > > that
> > > > > > >>> since
> > > > > > >>> > > > > > > reassignment
> > > > > > >>> > > > > > > > is
> > > > > > >>> > > > > > > > >>> > generally a slow process it makes little
> > > > difference
> > > > > > >>> if we
> > > > > > >>> > > > > > increase
> > > > > > >>> > > > > > > > the
> > > > > > >>> > > > > > > > >>> > latency of the interactions with
> zookeeper.
> > > > > > >>> > > > > > > > >>> >
> > > > > > >>> > > > > > > > >>> > I have updated the KIP with these
> > > > improvements, and a
> > > > > > >>> > more
> > > > > > >>> > > > > > detailed
> > > > > > >>> > > > > > > > >>> > description of exactly how we would
> manage
> > > > these
> > > > > > >>> znodes.
> > > > > > >>> > > > > > > > >>> >
> > > > > > >>> > > > > > > > >>> > Reading the algorithm in KafkaController.
> > > > > > >>> > > > > > > onPartitionReassignment(),
> > > > > > >>> > > > > > > > it
> > > > > > >>> > > > > > > > >>> > seems that it would be suboptimal for
> > > changing
> > > > > > >>> > > reassignments
> > > > > > >>> > > > > > > > in-flight.
> > > > > > >>> > > > > > > > >>> > Consider an initial assignment of [1,2],
> > > > reassigned
> > > > > > >>> to
> > > > > > >>> > > [2,3]
> > > > > > >>> > > > > and
> > > > > > >>> > > > > > > then
> > > > > > >>> > > > > > > > >>> > changed to [2,4]. Broker 3 will remain in
> > the
> > > > > > >>> assigned
> > > > > > >>> > > > replicas
> > > > > > >>> > > > > > > until
> > > > > > >>> > > > > > > > >>> > broker 4 is in sync, even though 3 wasn't
> > > > actually
> > > > > > >>> one of
> > > > > > >>> > > the
> > > > > > >>> > > > > > > > original
> > > > > > >>> > > > > > > > >>> > assigned replicas and is no longer a new
> > > > assigned
> > > > > > >>> > replica.
> > > > > > >>> > > I
> > > > > > >>> > > > > > think
> > > > > > >>> > > > > > > > this
> > > > > > >>> > > > > > > > >>> > also affects the case where the
> > reassignment
> > > is
> > > > > > >>> cancelled
> > > > > > >>> > > > > > > > >>> > ([1,2]->[2,3]->[1,2]): We again have to
> > wait
> > > > for 3 to
> > > > > > >>> > catch
> > > > > > >>> > > > up,
> > > > > > >>> > > > > > > even
> > > > > > >>> > > > > > > > >>> though
> > > > > > >>> > > > > > > > >>> > its replica will then be deleted.
> > > > > > >>> > > > > > > > >>> >
> > > > > > >>> > > > > > > > >>> > Should we seek to improve this algorithm
> in
> > > > this
> > > > > > >>> KIP, or
> > > > > > >>> > > > leave
> > > > > > >>> > > > > > that
> > > > > > >>> > > > > > > > as
> > > > > > >>> > > > > > > > >>> a
> > > > > > >>> > > > > > > > >>> > later optimisation?
> > > > > > >>> > > > > > > > >>> >
> > > > > > >>> > > > > > > > >>> > Cheers,
> > > > > > >>> > > > > > > > >>> >
> > > > > > >>> > > > > > > > >>> > Tom
> > > > > > >>> > > > > > > > >>> >
> > > > > > >>> > > > > > > > >>> > On 11 December 2017 at 21:31, Jun Rao <
> > > > > > >>> jun@confluent.io>
> > > > > > >>> > > > > wrote:
> > > > > > >>> > > > > > > > >>> >
> > > > > > >>> > > > > > > > >>> > > Another question is on the
> compatibility.
> > > > Since now
> > > > > > >>> > there
> > > > > > >>> > > > > are 2
> > > > > > >>> > > > > > > > ways
> > > > > > >>> > > > > > > > >>> of
> > > > > > >>> > > > > > > > >>> > > specifying a partition reassignment,
> one
> > > > under
> > > > > > >>> > > > > > > > >>> /admin/reassign_partitions
> > > > > > >>> > > > > > > > >>> > > and the other under
> /admin/reassignments,
> > > we
> > > > > > >>> probably
> > > > > > >>> > > want
> > > > > > >>> > > > to
> > > > > > >>> > > > > > > > >>> prevent the
> > > > > > >>> > > > > > > > >>> > > same topic being reassigned under both
> > > paths
> > > > at the
> > > > > > >>> > same
> > > > > > >>> > > > > time?
> > > > > > >>> > > > > > > > >>> > > Thanks,
> > > > > > >>> > > > > > > > >>> > >
> > > > > > >>> > > > > > > > >>> > > Jun
> > > > > > >>> > > > > > > > >>> > >
> > > > > > >>> > > > > > > > >>> > >
> > > > > > >>> > > > > > > > >>> > >
> > > > > > >>> > > > > > > > >>> > > On Fri, Dec 8, 2017 at 5:41 PM, Jun
> Rao <
> > > > > > >>> > > jun@confluent.io>
> > > > > > >>> > > > > > > wrote:
> > > > > > >>> > > > > > > > >>> > >
> > > > > > >>> > > > > > > > >>> > > > Hi, Tom,
> > > > > > >>> > > > > > > > >>> > > >
> > > > > > >>> > > > > > > > >>> > > > Thanks for the KIP. It definitely
> > > > addresses one
> > > > > > >>> of
> > > > > > >>> > the
> > > > > > >>> > > > pain
> > > > > > >>> > > > > > > > points
> > > > > > >>> > > > > > > > >>> in
> > > > > > >>> > > > > > > > >>> > > > partition reassignment. Another issue
> > > that
> > > > it
> > > > > > >>> also
> > > > > > >>> > > > > addresses
> > > > > > >>> > > > > > is
> > > > > > >>> > > > > > > > >>> the ZK
> > > > > > >>> > > > > > > > >>> > > node
> > > > > > >>> > > > > > > > >>> > > > size limit when writing the
> > reassignment
> > > > JSON.
> > > > > > >>> > > > > > > > >>> > > >
> > > > > > >>> > > > > > > > >>> > > > My only concern is that the KIP needs
> > to
> > > > create
> > > > > > >>> one
> > > > > > >>> > > > watcher
> > > > > > >>> > > > > > per
> > > > > > >>> > > > > > > > >>> > > reassigned
> > > > > > >>> > > > > > > > >>> > > > partition. This could add overhead in
> > ZK
> > > > and
> > > > > > >>> > complexity
> > > > > > >>> > > > for
> > > > > > >>> > > > > > > > >>> debugging
> > > > > > >>> > > > > > > > >>> > > when
> > > > > > >>> > > > > > > > >>> > > > lots of partitions are being
> reassigned
> > > > > > >>> > simultaneously.
> > > > > > >>> > > > We
> > > > > > >>> > > > > > > could
> > > > > > >>> > > > > > > > >>> > > > potentially improve this by
> > introducing a
> > > > > > >>> separate ZK
> > > > > > >>> > > > path
> > > > > > >>> > > > > > for
> > > > > > >>> > > > > > > > >>> change
> > > > > > >>> > > > > > > > >>> > > > notification as we do for configs.
> For
> > > > example,
> > > > > > >>> every
> > > > > > >>> > > > time
> > > > > > >>> > > > > we
> > > > > > >>> > > > > > > > >>> change
> > > > > > >>> > > > > > > > >>> > the
> > > > > > >>> > > > > > > > >>> > > > assignment for a set of partitions,
> we
> > > > could
> > > > > > >>> further
> > > > > > >>> > > > write
> > > > > > >>> > > > > a
> > > > > > >>> > > > > > > > >>> sequential
> > > > > > >>> > > > > > > > >>> > > > node
> > > > /admin/reassignment_changes/[change_x]. That
> > > > > > >>> > way,
> > > > > > >>> > > > the
> > > > > > >>> > > > > > > > >>> controller
> > > > > > >>> > > > > > > > >>> > > > only needs to watch the change path.
> > > Once a
> > > > > > >>> change is
> > > > > > >>> > > > > > > triggered,
> > > > > > >>> > > > > > > > >>> the
> > > > > > >>> > > > > > > > >>> > > > controller can read everything under
> > > > > > >>> > > > /admin/reassignments/.
> > > > > > >>> > > > > > > > >>> > > >
> > > > > > >>> > > > > > > > >>> > > > Jun
> > > > > > >>> > > > > > > > >>> > > >
> > > > > > >>> > > > > > > > >>> > > >
> > > > > > >>> > > > > > > > >>> > > > On Wed, Dec 6, 2017 at 1:19 PM, Tom
> > > > Bentley <
> > > > > > >>> > > > > > > > t.j.bentley@gmail.com
> > > > > > >>> > > > > > > > >>> >
> > > > > > >>> > > > > > > > >>> > > wrote:
> > > > > > >>> > > > > > > > >>> > > >
> > > > > > >>> > > > > > > > >>> > > >> Hi,
> > > > > > >>> > > > > > > > >>> > > >>
> > > > > > >>> > > > > > > > >>> > > >> This is still very new, but I wanted
> > > some
> > > > quick
> > > > > > >>> > > feedback
> > > > > > >>> > > > > on
> > > > > > >>> > > > > > a
> > > > > > >>> > > > > > > > >>> > > preliminary
> > > > > > >>> > > > > > > > >>> > > >> KIP which could, I think, help with
> > > > providing an
> > > > > > >>> > > > > AdminClient
> > > > > > >>> > > > > > > API
> > > > > > >>> > > > > > > > >>> for
> > > > > > >>> > > > > > > > >>> > > >> partition reassignment.
> > > > > > >>> > > > > > > > >>> > > >>
> > > > > > >>> > > > > > > > >>> > > >> https://cwiki.apache.org/
> > > > > > >>> > > confluence/display/KAFKA/KIP-
> > > > > > >>> > > > > 236%
> > > > > > >>> > > > > > > > >>  

Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

Posted by Stanislav Kozlovski <st...@confluent.io>.
Hi George,

I think we might want to move the discussion to KIP-455 as this KIP is very
tightly coupled to it.

One issue I see is when controller failover occurs,  the new controller
> will need to read the active reassignments by scanning the
> /brokers/topics/<topic>/<partitions>,  for some clusters with hundred+
> brokers and 10s thousands of partitions per broker, it might not be
> scalable for this scan?
>
As Colin had said in the KIP-455 discussion, there is no impact because we
already scan these znodes as part of the normal controller startup.

Another issue, the controller has all the active reassignment in Memory (as
> well as in ZK partition level), the non-controller brokers will maintain
> its Reassignment context in memory as well thru Controller's
> LeaderAndIsrRequest with additional reassignment data,  if the broker
> missing the LeaderAndIsrRequest, will it cause that broker's
> discrepancies?  for performance reason, we do not want the broker's to scan
> all topic/partition to find current active reassignments ?

As far as I understand, the current reassignment code also depends on the
LeaderAndIsr request and the non-controller still keep the reassignment
context in memory. All through the replicas set for a given partition. The
only difference is that we now separate the replicas set into two
finer-grained sets - replicas and targetReplicas.

That's the main purpose of KIP-236.  For data storage product/system, it's
> good to have the cancel/rollback feature.

I fully agree, I think this is a table stakes feature.

Thanks,
Stanislav


On Wed, May 1, 2019 at 7:29 AM George Li <sq...@yahoo.com.invalid>
wrote:

>  Hi Jason,
>
> Sorry for the late response.  been busy.  I have taken a brief look at
> Colin's KIP-455. It's a good direction for Reassignments. I think KIP-236,
> KIP-435,  KIP-455 need some consensus / coordination for where the new
> reassignment data AND the original replicas is stored.
>
> The current way of storing new reassignment data  in  ZK node
> /admin/reassign_partitions Vs the KIP-455 storing in the
> /brokers/topics/<topic>/<partitions> level has pros and cons.
>
> Storing in ZK node ZK node /admin/reassign_partitions can quickly find out
> the current pending/active reassignments.  Storing in topic/partition level
> is generally fine for controller, since the active reassignments are also
> kept in memory Controller Context.  One issue I see is when controller
> failover occurs,  the new controller will need to read the active
> reassignments by scanning the /brokers/topics/<topic>/<partitions>,  for
> some clusters with hundred+ brokers and 10s thousands of partitions per
> broker, it might not be scalable for this scan?  The failover controller
> needs to come up as quickly as possible?
>
> Another issue, the controller has all the active reassignment in Memory
> (as well as in ZK partition level), the non-controller brokers will
> maintain its Reassignment context in memory as well thru Controller's
> LeaderAndIsrRequest with additional reassignment data,  if the broker
> missing the LeaderAndIsrRequest, will it cause that broker's
> discrepancies?  for performance reason, we do not want the broker's to scan
> all topic/partition to find current active reassignments ?
>
> I agree this is more flexible in the long term to have the capability of
> submitting new reassignments while current batch is still active.  You
> brought up a good point of allowing canceling individual reassignment,  I
> will have the reassignment cancel RPC to specify individual partition for
> cancellation, if it's null, then cancel all pending reassignments.  In the
> current work environment, Reassignment operations are very sensitive
> operations to the cluster's latency, especially for the cluster with
> min.insync.replicas >=2, So when reassignment is kicked off and cluster
> performance is affected, the most desirable features that we would like is
> to cancel all pending reassignments in a timely/cleanly fashion so that the
> cluster performance can be restored. That's the main purpose of KIP-236.
> For data storage product/system, it's good to have the cancel/rollback
> feature.
>
> Thanks,
> George
>
>     On Wednesday, April 24, 2019, 10:57:45 AM PDT, Jason Gustafson <
> jason@confluent.io> wrote:
>
>  Hi George,
>
> I think focusing on the admin API for cancellation in this KIP is
> reasonable. Colin wrote up KIP-455 which adds APIs to submit a reassignment
> and to list reassignments in progress. Probably as long as we make these
> APIs all consistent with each other, it is fine to do them separately.
>
> The major point we need to reach agreement on is how an active reassignment
> will be stored. In the proposal here, we continue to
> use /admin/reassign_partitions and we further extend it to differentiate
> between the original replicas and the target replicas of a reassignment. In
> KIP-455, Colin proposes to store the same information, but do it at the
> partition level. Obviously we would only do one of these. I think the main
> difference is how we think about reassignment in general. Currently we see
> it as basically a batch job which is submitted atomically and not completed
> until all the partitions in the reassignment have been moved. The other
> perspective is to treat reassignments more fluidly. At any point, we just
> have some partitions which are undergoing reassignment. Then there is no
> longer a question about supporting multiple reassignments: we can reassign
> new partitions at any time and we can cancel individual partition
> reassignments also at any time. This is a more ambitious approach, but it
> seems much more flexible in the long term.
>
> Thanks,
> Jason
>
>
>
> On Fri, Apr 5, 2019 at 7:25 PM George Li <sql_consulting@yahoo.com
> .invalid>
> wrote:
>
> >  Hi Jun,
> >
> > Thanks for the feedback!
> >
> > for 40,  I agree.  It makes sense to do it via RPC request to the
> > controller.  Maybe for KIP-236,  I will just implement RPC for
> reassignment
> > cancellation only?  otherwise if KIP-236 includes other previous
> > Reassignment related operations such as Submitting Reassignments,
> > --generate,  --verify, Change/Remove Throttle, I think KIP-236 will
> incur a
> > lot of changes.
> >
> > For 41, the current reassignment logic, after /admin/reassign_partitions
> > has the "new_replicas" for the topic/partition,  the controller will set
> > the AR (Assigned Replicas) to be the Set(original_replicas) +
> > Set(new_replicas),  thus when reassignment begins, the
> "original_replicas"
> > info is lost. It's not possible to use the AR and new_replicas in
> > /admin/reassign_partitions to derive back the "original_replicas".
> >
> > To cancel the current pending reassignments, the "original_replicas" info
> > is required to restore/rollback.
> >
> > As a matter of fact, if we have "original_replicas" info when doing
> > reassignment, some features can be accomplished.  e.g. To name a few:
> > separating the Reassignment Replication Traffic with the normal
> Follower's
> > ReplicaFetcher thread pool, the dedicated Reassignment ReplicaFetcher
> > thread pool can have throttle enforced, while existing Follower
> > ReplicaFetcher not throttled. some networking parameters can be tuned for
> > Reassignment ReplicaFetcher thread pool.  The Reassignment can have its
> own
> > MaxLag, TotalLag metrics.  and URP (Under Replicated Partition) can be
> > reported differently for existing followers Vs. the URP caused by
> > Reassignment.  Anyway, these might be some future KIPs that we might
> > submit.
> >
> > Thanks,
> > George
> >
> >
> >    On Thursday, April 4, 2019, 4:32:26 PM PDT, Jun Rao <jun@confluent.io
> >
> > wrote:
> >
> >  Hi, George,
> >
> > Thanks for the KIP. Sorry for the late reply. A couple of comments below.
> >
> > 40. I agree that it's better to issue an RPC request to the controller
> for
> > reassignment cancellation. If we do that, it would be useful to decide
> > whether that call blocks on cancellation completion or not.
> >
> > 41. Is it necessary to add the new "original replicas" field in
> > /admin/reassign_partitions?
> > The original replicas are already in the topic path in ZK.
> >
> > Jun
> >
> > On Tue, Mar 26, 2019 at 5:24 PM George Li <sql_consulting@yahoo.com
> > .invalid>
> > wrote:
> >
> > >  Hi Ismael,
> > >
> > > Thanks,  I understand your points. I will add the RPC mechanism for
> > > reassignments to KIP-236.  I can think of a few Requests/Responses
> > > corresponding to the old Scala AdminClient using ZK:
> > >
> > > SubmitReassignments (--execute)
> > > StatusReassignments (--verify)
> > > ChangeOrRemoveReassignmentsThrottle (--verify)
> > > GenerateReassignments (--generate)
> > > CancelPendingReassignments (new in KIP-236)
> > >
> > > To clarify the "structure" change of KIP-236, the ZK public interface
> > > remains the same:  {topic, partition, new_replicas},  the user client
> > > generate/submit the reassignment plan the same way as before.  the new
> > > "original_replicas" in the ZK node is added by admin client before
> > writing
> > > to the ZK node.  A bit similar to the "log_dirs".  User's direct
> > > modification of ZK /admin/reassign_partitions is strongly discouraged.
> > >
> > > The original_replicas info is essential for cancellation/rollback of
> the
> > > reassignments still pending.
> > >
> > > Thanks,
> > > George
> > >
> > >    On Monday, March 25, 2019, 4:43:30 PM PDT, Ismael Juma <
> > > ismael@juma.me.uk> wrote:
> > >
> > >  Hi George,
> > >
> > > The goal is not to prevent people from updating ZK directly. The goal
> is
> > to
> > > offer a solution where people don't have to. If people then decide to
> > avoid
> > > the recommended path, they can deal with the consequences. However, if
> we
> > > add another structure in ZK and no RPC mechanism, then there is no
> > > recommended path apart from updating ZK (implicitly making it an API
> for
> > > users).
> > >
> > > Ismael
> > >
> > > On Mon, Mar 25, 2019 at 3:57 PM George Li <sql_consulting@yahoo.com
> > > .invalid>
> > > wrote:
> > >
> > > >  Thanks Ismael.  One question, even switch to submitting
> reassignments
> > > via
> > > > RPC instead of Zookeeper.  The reassignment data will still persist
> in
> > > > ZooKeeper node /admin/reassign_partitions (e.g. when Controller
> > failover
> > > it
> > > > can resume reassignments)?  If yes, how this can keep someone from
> > > > modifying ZK (/admin/reassign_partitions) directly ?
> > > >
> > > >
> > > > Thanks,
> > > > George
> > > >
> > > >    On Saturday, March 23, 2019, 1:07:11 PM PDT, Ismael Juma <
> > > > ismaelj@gmail.com> wrote:
> > > >
> > > >  Thanks for the KIP, making reassignment more flexible is definitely
> > > > welcome. As others have mentioned, I think we need to do it via the
> > Kafka
> > > > protocol and not via ZK. The latter introduces an implicit API that
> > other
> > > > tools will depend on causing migration challenges. This has already
> > > > happened with the existing ZK based interface and we should avoid
> > > > introducing more tech debt here.
> > > >
> > > > Ismael
> > > >
> > > > On Sat, Mar 23, 2019, 12:09 PM Colin McCabe <cm...@apache.org>
> > wrote:
> > > >
> > > > > On Thu, Mar 21, 2019, at 20:51, George Li wrote:
> > > > > >  Hi Colin,
> > > > > >
> > > > > > I agree with your proposal of having administrative APIs through
> > RPC
> > > > > > instead of ZooKeeper. But seems like it will incur significant
> > > changes
> > > > > > to both submitting reassignments and this KIP's cancelling
> pending
> > > > > > reassignments.
> > > > > >
> > > > > > To make this KIP simple and moving along, I will be happy to do
> > > another
> > > > > > follow-up KIP to change all reassignment related operations via
> RP
> > > > >
> > > > > Thanks, George.  I think doing it as a two-step process is fine,
> but
> > I
> > > > > suspect it would be much easier and quicker to do the RPC
> conversion
> > > > first,
> > > > > and the interruptible part later.  The reason is because a lot of
> the
> > > > > things that people have brought up as concerns with this KIP are
> > really
> > > > > issues with the API (how will people interact with ZK, how does
> > access
> > > > > control work, what does the format look like in ZK) that will just
> go
> > > > away
> > > > > once we have an RPC.
> > > > >
> > > > > > Just curious,  KIP-4 includes Topics/ACL related operations. In
> > > > > > addition to Reassignments,  any other operations should be done
> via
> > > > > > RPC?
> > > > >
> > > > > I think all of the administrative shell scripts have been converted
> > > > except
> > > > > kafka-configs.sh.  I believe there is a KIP for that conversion.
> > > > > Reassigning partitions is probably the biggest KIP-4 gap we have
> > right
> > > > now.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > George
> > > > > >
> > > > > >
> > > > > >    On Wednesday, March 20, 2019, 5:28:59 PM PDT, Colin McCabe
> > > > > > <co...@cmccabe.xyz> wrote:
> > > > > >
> > > > > >  Hi George,
> > > > > >
> > > > > > One big problem here is that administrative APIs should be done
> > > through
> > > > > > RPCs, not through ZooKeeper.  KIP-4 (Command line and centralized
> > > > > > administrative operations) describes the rationale for this.  We
> > want
> > > > > > public and stable APIs that don't depend on the internal
> > > representation
> > > > > > of stuff in ZK, which will change over time.  Tools shouldn't
> have
> > to
> > > > > > integrate with ZK or understand the internal data structures of
> > Kafka
> > > > > > to make administrative changes.  ZK doesn't have a good security,
> > > > > > access control, or compatibility story.
> > > > > >
> > > > > > We should create an official reassignment RPC for Kafka.  This
> will
> > > > > > solve many of the problems discussed in this thread, I think.
> For
> > > > > > example, with an RPC, users won't be able to make changes unless
> > they
> > > > > > have ALTER on KafkaCluster.  That avoids the problem of random
> > users
> > > > > > making changes without the administrator knowing.  Also, if
> > multiple
> > > > > > users are making changes, there is no risk that they will
> overwrite
> > > > > > each other's changes, since they won't be modifying the internal
> ZK
> > > > > > structures directly.
> > > > > >
> > > > > > I think a good reassignment API would be something like:
> > > > > >
> > > > > > > ReassignPartitionsResults
> reassignPartitions(Map<TopicPartition,
> > > > > PartitionAssignment> reassignments);
> > > > > > >
> > > > > > > class PartitionAssignment {
> > > > > > >  List<Integer> nodes;
> > > > > > > }
> > > > > > >
> > > > > > > class ReassignPartitionsResults {
> > > > > > >  Map<TopicPartition, KafkaFuture<Void>> pending;
> > > > > > >  Map<TopicPartition, KafkaFuture<Void>> completed;
> > > > > > >  Map<TopicPartition, KafkaFuture<Void>> rejected;
> > > > > > > }
> > > > > > >
> > > > > > > PendingReassignmentResults pendingReassignments();
> > > > > > >
> > > > > > > class PendingReassignmentResults {
> > > > > > >  KafkaFuture<Map<TopicPartition, PartitionAssignment>> pending;
> > > > > > >  KafkaFuture<Map<TopicPartition, PartitionAssignment>>
> previous;
> > > > > > > }
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > >
> > > > > > On Tue, Mar 19, 2019, at 15:04, George Li wrote:
> > > > > > >  Hi Viktor,
> > > > > > >
> > > > > > > Thanks for the review.
> > > > > > >
> > > > > > > If there is reassignment in-progress while the cluster is
> > upgraded
> > > > > with
> > > > > > > this KIP (upgrade the binary and then do a cluster rolling
> > restart
> > > of
> > > > > > > the brokers), the reassignment JSON in Zookeeper
> > > > > > > /admin/reassign_partitions will only have  {topic, partition,
> > > > > > > replicas(new)} info when the batch of reassignment was kicked
> off
> > > > > > > before the upgrade,  not with the "original_replicas" info per
> > > > > > > topic/partition.  So when the user is trying to cancel/rollback
> > the
> > > > > > > reassignments, it's going to fail and the cancellation will be
> > > > skipped
> > > > > > > (The code in this KIP will check the if the "original_replicas"
> > is
> > > in
> > > > > > > the /admin/reassign_partition).
> > > > > > >
> > > > > > > The user either has to wait till current reassignments to
> finish
> > or
> > > > > > > does quite some manual work to cancel them (delete ZK node,
> > bounce
> > > > > > > controller, re-submit reassignments with original replicas to
> > > > > rollback,
> > > > > > > if the original replicas are kept before the last batch of
> > > > > > > reassignments were submitted).
> > > > > > >
> > > > > > > I think this scenario of reassignments being kicked off by
> > > end-user,
> > > > > > > not by the team(s) that managed Kafka infrastructure might be
> > rare
> > > > > > > (maybe in some very small companies?),  since only one batch of
> > > > > > > reassignments can be running at a given time in
> > > > > > > /admin/reassign_partitions.  The end-users need some
> > co-ordination
> > > > for
> > > > > > > submitting reassignments.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > George
> > > > > > >
> > > > > > >
> > > > > > >    On Tuesday, March 19, 2019, 3:34:20 AM PDT, Viktor
> > Somogyi-Vass
> > > > > > > <vi...@gmail.com> wrote:
> > > > > > >
> > > > > > >  Hey George,
> > > > > > > Thanks for the answers. I'll try to block out time this week to
> > > > review
> > > > > > > your PR.
> > > > > > > I have one more point to clarify:I've seen some customers who
> are
> > > > > > > managing Kafka as an internal company-wide service and they may
> > or
> > > > may
> > > > > > > not know that how certain topics are used within the company.
> > That
> > > > > > > might mean that some clients can start reassignment at random
> > > > > > > times. Let's suppose that such a reassignment starts just when
> > the
> > > > > > > Kafka operation team starts upgrading the cluster that contains
> > > this
> > > > > > > KIP. The question is: do you think that we need handle upgrade
> > > > > > > scenarios where there is an in-progress reassignment?
> > > > > > > Thanks,
> > > > > > > Viktor
> > > > > > >
> > > > > > > On Tue, Mar 19, 2019 at 6:16 AM George Li <
> > > sql_consulting@yahoo.com>
> > > > > wrote:
> > > > > > >
> > > > > > >  Hi Viktor,
> > > > > > > FYI, I have added a new ducktape
> > > > > > > test:  tests/kafkatest/tests/core/reassign_cancel_test.py
> > > > > > > to https://github.com/apache/kafka/pull/6296
> > > > > > > After review, do you have any more questions?  Thanks
> > > > > > >
> > > > > > > Hi Jun,
> > > > > > > Could you help review this when you have time?  Thanks
> > > > > > >
> > > > > > > Can we start a vote on this KIP in one or two weeks?
> > > > > > >
> > > > > > > Thanks,George
> > > > > > >
> > > > > > >
> > > > > > >    On Tuesday, March 5, 2019, 10:58:45 PM PST, George Li
> > > > > > > <sq...@yahoo.com> wrote:
> > > > > > >
> > > > > > >  Hi Viktor,
> > > > > > >
> > > > > > >
> > > > > > > >  2.: One follow-up question: if the reassignment cancellation
> > > gets
> > > > > interrupted and a failover happens after step #2 but before step
> #3,
> > > how
> > > > > will the new controller continue? At this stage Zookeeper would
> > contain
> > > > > OAR + RAR, however the brokers will have the updated LeaderAndIsr
> > about
> > > > > OAR, so they won't know about RAR. I would suppose the new
> controller
> > > > would
> > > > > start from the beginning as it only knows what's in Zookeeper. Is
> > that
> > > > true?
> > > > > > >
> > > > > > > The OAR (Original Assigned Replicas) for the rollback is stored
> > in
> > > > the
> > > > > > > /admin/reassign_partitions for each topic/partition
> > reassignments.
> > > > > > > During the controller failover,  the new controller will read
> > > > > > > /admin/reassign_partitions for both new_replicas AND
> > > > original_replicas
> > > > > > > into controllerContext.partitionsBeingReassigned
> > > > > > >
> > > > > > > then perform pending reassignments cancellation/rollback.
> > > > > > > The originalReplicas  is added below:
> > > > > > > case class ReassignedPartitionsContext(var newReplicas:
> Seq[Int]
> > =
> > > > > > > Seq.empty,
> > > > > > >                                        var originalReplicas:
> > > > Seq[Int]=
> > > > > > > Seq.empty,
> > > > > > >                                        val
> > > reassignIsrChangeHandler:
> > > > > > > PartitionReassignmentIsrChangeHandler) {
> > > > > > >
> > > > > > >
> > > > > > > > 2.1: Another interesting question that are what are those
> > > replicas
> > > > > are doing which are online but not part of the leader and ISR? Are
> > they
> > > > > still replicating? Are they safe to lie around for the time being?
> > > > > > >
> > > > > > > I think they are just dangling without being replicated.  Upon
> > > > > > > LeaderAndIsr request, the makeFollowers()
> > > > > > > will replicaFetcherManager.removeFetcherForPartitions() and it
> > will
> > > > > not
> > > > > > > be added in ReplicaFetcher since they are not in the current AR
> > > > > > > (Assigned Replicas).  Step 4 (StopReplica) will delete them.
> > > > > > >
> > > > > > >
> > > > > > > I am adding a new ducktape test:
> > > > > > > tests/kafkatest/tests/core/reassign_cancel_test.py for
> cancelling
> > > > > > > pending reassignments.  but it's not easy to simulate
> controller
> > > > > > > failover during the cancellation since the
> cancellation/rollback
> > is
> > > > so
> > > > > > > fast to complete. See below.  I do have a unit/integration test
> > to
> > > > > > > simulate controller
> > > > > > > failover:  shouldTriggerReassignmentOnControllerStartup()
> > > > > > >
> > > > > > >
> > > > > > > Thanks,George
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Warning: You must run Verify periodically, until the
> reassignment
> > > > > > > completes, to ensure the throttle is removed. You can also
> alter
> > > the
> > > > > > > throttle by rerunning the Execute command passing a new
> value.The
> > > > > > > inter-broker throttle limit was set to 1024 B/sSuccessfully
> > started
> > > > > > > reassignment of partitions.
> > > > > > > [INFO  - 2019-03-05 04:20:48,321 - kafka -
> > execute_reassign_cancel
> > > -
> > > > > > > lineno:514]: Executing cancel / rollback pending partition
> > > > > > > reassignment...[DEBUG - 2019-03-05 04:20:48,321 - kafka -
> > > > > > > execute_reassign_cancel - lineno:515]:
> > > > > > > /opt/kafka-dev/bin/kafka-reassign-partitions.sh --zookeeper
> > > > > > > worker1:2181 --cancel[DEBUG - 2019-03-05 04:20:48,321 -
> > > remoteaccount
> > > > > -
> > > > > > > _log - lineno:160]: vagrant@worker2: Running ssh command:
> > > > > > > /opt/kafka-dev/bin/kafka-reassign-partitions.sh --zookeeper
> > > > > > > worker1:2181 --cancel[DEBUG - 2019-03-05 04:20:57,452 - kafka -
> > > > > > > execute_reassign_cancel - lineno:520]: Verify cancel / rollback
> > > > > pending
> > > > > > > partition reassignment:Rolling back the current pending
> > > reassignments
> > > > > > > Map(test_topic-7 -> Map(replicas -> Buffer(3, 2, 5),
> > > > original_replicas
> > > > > > > -> Buffer(3, 2, 4)), test_topic-18 -> Map(replicas -> Buffer(5,
> > 1,
> > > > 2),
> > > > > > > original_replicas -> Buffer(4, 1, 2)), test_topic-3 ->
> > Map(replicas
> > > > ->
> > > > > > > Buffer(5, 2, 3), original_replicas -> Buffer(4, 2, 3)),
> > > test_topic-15
> > > > > > > -> Map(replicas -> Buffer(1, 3, 5), original_replicas ->
> > Buffer(1,
> > > 3,
> > > > > > > 4)), test_topic-11 -> Map(replicas -> Buffer(2, 3, 5),
> > > > > > > original_replicas -> Buffer(2, 3, 4)))Successfully submitted
> > > > > > > cancellation of reassignments.The cancelled pending
> reassignments
> > > > > > > throttle was removed.Please run --verify to have the previous
> > > > > > > reassignments (not just the cancelled reassignments in
> progress)
> > > > > > > throttle removed.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >    On Tuesday, March 5, 2019, 8:20:25 AM PST, Viktor
> Somogyi-Vass
> > > > > > > <vi...@gmail.com> wrote:
> > > > > > >
> > > > > > >  Hey George,
> > > > > > > Sorry for the delay.I'll answer point-by-point:1.2: I think
> it's
> > > > fine.
> > > > > > > As you say we presume that the client knows the state of the
> > > cluster
> > > > > > > before doing the reassignment, so we can presume the same for
> > > > > > > cancellation.2.: One follow-up question: if the reassignment
> > > > > > > cancellation gets interrupted and a failover happens after step
> > #2
> > > > but
> > > > > > > before step #3, how will the new controller continue? At this
> > stage
> > > > > > > Zookeeper would contain OAR + RAR, however the brokers will
> have
> > > the
> > > > > > > updated LeaderAndIsr about OAR, so they won't know about RAR. I
> > > would
> > > > > > > suppose the new controller would start from the beginning as it
> > > only
> > > > > > > knows what's in Zookeeper. Is that true?2.1: Another
> interesting
> > > > > > > question that are what are those replicas are doing which are
> > > online
> > > > > > > but not part of the leader and ISR? Are they still replicating?
> > Are
> > > > > > > they safe to lie around for the time being?
> > > > > > > Best,
> > > > > > > Viktor
> > > > > > >
> > > > > > > On Fri, Mar 1, 2019 at 8:40 PM George Li <
> > sql_consulting@yahoo.com
> > > >
> > > > > wrote:
> > > > > > >
> > > > > > >  Hi Jun,
> > > > > > > Could you help review KIP-236 when you have time?  Thanks.
> > > > > > >
> > > > > > >
> > > > > > > Hi Becket,
> > > > > > > Since you filed
> https://issues.apache.org/jira/browse/KAFKA-6304
> > > to
> > > > > > > request this feature.  Could you also help review and comment
> on
> > > > > > > KIP-236 ?  Thanks.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Hi Viktor,
> > > > > > > I have updated https://github.com/apache/kafka/pull/6296  and
> > also
> > > > > > > KIP-236:
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment
> > > > Please
> > > > > see below new section "Skip Reassignment Cancellation Scenarios":
> > > > > > >
> > > > > > > Skip Reassignment Cancellation Scenarios
> > > > > > >
> > > > > > > There are a couple scenarios that the Pending reassignments
> > > > > > > in /admin/reassign_partitions can not be cancelled / rollback.
> > > > > > >
> > > > > > >    - If the "original_replicas"  is missing for the
> > topic/partition
> > > > > > > in /admin/reassign_partitions .  In this case, the pending
> > > > > reassignment
> > > > > > > cancelled will be skipped.  Because there is no way to reset to
> > the
> > > > > > > original replicas.  The reasons this can happened  could be:
> > > > > > >      - if either the user/client is
> > > > > > > tampering /admin/reassign_partitions directly, and does not
> have
> > > the
> > > > > > > "original_replicas" for the topic
> > > > > > >      - if the user/client is using incorrect versions of the
> > admin
> > > > > > > client to submit for reassignments.  The Kafka software should
> be
> > > > > > > upgraded not just for all the brokers in the cluster.  but also
> > on
> > > > the
> > > > > > > host that is used to submit reassignments.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >    - If all the "original_replicas" brokers are not in ISR,
> and
> > > some
> > > > > > > brokers in the "new_replicas" are not offline for the
> > > topic/partition
> > > > > > > in the pending reassignments.  In this case, it's better to
> skip
> > > this
> > > > > > > topic's pending reassignment  cancellation/rollback,
> otherwise,
> > it
> > > > > > > will become offline.  However,  if all the brokers in
> > > > > > > "original_replicas" are offline  AND  all the brokers in
> > > > > "new_replicas"
> > > > > > > are also offline for this topic/partition,  then the cluster is
> > in
> > > > > such
> > > > > > > a bad state, the topic/partition is currently offline anyway,
> it
> > > > will
> > > > > > > cancel/rollback this topic pending reassignments back to the
> > > > > > > "original_replicas".
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > What other scenarios others can think of that reassignment
> > > > > cancellation
> > > > > > > should be skipped?  Thanks
> > > > > > >
> > > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > Another issues I would like to raise is the removing of
> throttle
> > > for
> > > > > > > the Cancelled Reassignments.  Currently the remove throttle
> code
> > is
> > > > in
> > > > > > > the Admin Client.  Since the pending reassignments are
> cancelled
> > > > > > > /rollback,  the throttle would not be removed by running admin
> > > client
> > > > > > > with --verify option to remove the throttle.  My approached is
> to
> > > > > > > remove the throttle in the admin client after the reassignments
> > > > > > > cancellation.  But I feel it's better to move this in
> Controller
> > > (in
> > > > > > > controller failover scenario).
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > George
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >    On Monday, February 25, 2019, 11:40:08 AM PST, George Li
> > > > > > > <sq...@yahoo.com> wrote:
> > > > > > >
> > > > > > >  Hi Viktor,
> > > > > > > Thanks for the response.  Good questions!  answers below:
> > > > > > > > A few questions regarding the rollback algorithm:> 1. At
> step 2
> > > how
> > > > > do you elect the leader?
> > > > > > >
> > > > > > > The step 2 code is in Enable pending reassignments
> > > > > > > cancellation/rollback by sql888 · Pull Request #6296 ·
> > > > > > >
> > > > > apache/kafka
> > > core/src/main/scala/kafka/controller/KafkaController.scala
> > > > line#622
> > > > > > >
> > > > > > > |
> > > > > > > |
> > > > > > > |
> > > > > > > |  |  |
> > > > > > >
> > > > > > >  |
> > > > > > >
> > > > > > >  |
> > > > > > > |
> > > > > > > |  |
> > > > > > > Enable pending reassignments cancellation/rollback by sql888 ·
> > Pull
> > > > > Requ...
> > > > > > >
> > > > > > > Enable pending reassignments (reassignments still in-flight in
> > > > > > > /admin/reassign_partitions) cancellation/rollback...
> > > > > > >  |
> > > > > > >
> > > > > > >  |
> > > > > > >
> > > > > > >  |
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > rollbackReassignedPartitionLeaderIfRequired(topicPartition,
> > > > > > > reassignedPartitionContext)During "pending" reassignment, e.g.
> > > > > (1,2,3)
> > > > > > > => (4,2,5)  normally, the leader (in this case broker_id 1)
> will
> > > > > remain
> > > > > > > as the leader until all replicas (1,2,3,4,5) in ISR, then the
> > > leader
> > > > > > > will be switched to 4.  However, in one scenario, if let's say
> > new
> > > > > > > replica 4 is already caught up in ISR, and somehow original
> > leader
> > > 1
> > > > > is
> > > > > > > down or bounced.  4 could become the new
> > > > > > > leader. rollbackReassignedPartitionLeaderIfRequired() will do a
> > > > > > > leadership election using
> > > > > > > PreferredReplicaPartitionLeaderElectionStrategy
> > > > > > >  among brokers in OAR (Original Assigned Replicas set in
> > memory). >
> > > > > > > 1.1. Would it be always the original leader?
> > > > > > > Not necessarily,  if the original preferred leader is down, it
> > can
> > > be
> > > > > > > other brokers in OAR which are in ISR.
> > > > > > > > 1.2. What if some brokers that are in OAR are down?
> > > > > > > If some brokers in OAR are down, the topic/partition will have
> > URP
> > > > > > > (Under Replicated Partition). The client deciding to do
> > > reassignment
> > > > > > > should be clear what the current state of the cluster is, what
> > > > brokers
> > > > > > > are down, what are up, what reassignment is trying to
> accomplish.
> > > > e.g.
> > > > > > > reassignment from down brokers to new brokers(?)
> > > > > > >
> > > > > > > > 2. I still have doubts that we need to do the reassignment
> > > > backwards
> > > > > during rollback. For instance if we decide to cancel the
> reassignment
> > > at
> > > > > step > #8 where replicas in OAR - RAR are offline and start the
> > > rollback,
> > > > > then how do we make a replica from OAR online again before
> electing a
> > > > > leader as described in step #2 of the rollback algorithm?> 3. Does
> > the
> > > > > algorithm defend against crashes? Is it able to continue after a
> > > > controller
> > > > > failover?
> > > > > > > > 4. I think it would be a good addition if you could add few
> > > example
> > > > > scenarios for rollback.
> > > > > > >
> > > > > > > yes. shouldTriggerReassignCancelOnControllerStartup() is the
> > > > > > > integration test to simulate controller failover while
> cancelling
> > > > > > > pending reassignments.  I will try to add controller failover
> > > > scenario
> > > > > > > in a ducktape system test.
> > > > > > > You do raise a good point here. If the cluster is in a very BAD
> > > > > shape,
> > > > > > > e.g. None of the OAR brokers are online,  but some new broker
> in
> > > RAR
> > > > > is
> > > > > > > in ISR and is current leader, it make senses not to rollback to
> > > keep
> > > > > > > that topic/partition online. However, if none of brokers in
> RAR
> > is
> > > > > > > online, it may make sense to rollback to OAR and remove it from
> > > > > > > /admin/reassign_partitions, since the cluster state is already
> so
> > > > bad,
> > > > > > > that topic/partition is offline anyway no matter rollback or
> not.
> > > > > > > I will add a check before cancel/rollback a topic/partition's
> > > pending
> > > > > > > reassignment by checking whether at least one broker of OAR is
> in
> > > > ISR,
> > > > > > > so that it can be elected as leader,  if not, skip that
> > > > > topic/partition
> > > > > > > reassignment cancellation and raise an exception.
> > > > > > > I will list a few more scenarios for rollback.
> > > > > > > What additional scenarios for rollback you and others can think
> > of?
> > > > > > >
> > > > > > > Thanks,George
> > > > > > >
> > > > > > >    On Monday, February 25, 2019, 3:53:33 AM PST, Viktor
> > > Somogyi-Vass
> > > > > > > <vi...@gmail.com> wrote:
> > > > > > >
> > > > > > >  Hey George,
> > > > > > > Thanks for the prompt response, it makes sense. I'll try to
> keep
> > > your
> > > > > > > code changes on top of my list and help reviewing that.
> > :)Regarding
> > > > > the
> > > > > > > incremental reassignment: I don't mind either to discuss it as
> > part
> > > > of
> > > > > > > this or in a separate conversation but I think a separate one
> > could
> > > > be
> > > > > > > better because both discussions can be long and keeping them
> > > > separated
> > > > > > > would limit the scope and make them more digestible and
> focused.
> > If
> > > > > the
> > > > > > > community decides to discuss it here then I think I'll put
> > KIP-435
> > > on
> > > > > > > hold or rejected and add my ideas here. If the community
> decides
> > to
> > > > > > > discuss it in a different KIP I think it's a good idea to move
> > the
> > > > > > > planned future work part into KIP-435 and rework that. Maybe we
> > can
> > > > > > > co-author it as I think both works could be complementary to
> each
> > > > > > > other. In any case I'd be absolutely interested in what others
> > > think.
> > > > > > > A few questions regarding the rollback algorithm:1. At step 2
> how
> > > do
> > > > > > > you elect the leader? 1.1. Would it be always the original
> > > > > leader? 1.2.
> > > > > > > What if some brokers that are in OAR are down?2. I still have
> > > doubts
> > > > > > > that we need to do the reassignment backwards during rollback.
> > For
> > > > > > > instance if we decide to cancel the reassignment at step #8
> where
> > > > > > > replicas in OAR - RAR are offline and start the rollback, then
> > how
> > > do
> > > > > > > we make a replica from OAR online again before electing a
> leader
> > as
> > > > > > > described in step #2 of the rollback algorithm?3. Does the
> > > algorithm
> > > > > > > defend against crashes? Is it able to continue after a
> controller
> > > > > > > failover?4. I think it would be a good addition if you could
> add
> > > few
> > > > > > > example scenarios for rollback.
> > > > > > > Best,
> > > > > > > Viktor
> > > > > > >
> > > > > > > On Fri, Feb 22, 2019 at 7:04 PM George Li <
> > > sql_consulting@yahoo.com>
> > > > > wrote:
> > > > > > >
> > > > > > >  Hi Viktor,
> > > > > > >
> > > > > > > Thanks for reading and provide feedbacks on KIP-236.
> > > > > > >
> > > > > > > For reassignments, one can generate a json for new assignments
> > and
> > > > > > > another json with "original" assignments for rollback purpose.
> > In
> > > > > > > production cluster, from our experience, we need to submit the
> > > > > > > reassignments in batches with throttle/staggering to minimize
> the
> > > > > > > impact to the cluster.  Some large topic/partition couple with
> > > > > throttle
> > > > > > > can take pretty long time for the new replica to be in ISR to
> > > > complete
> > > > > > > reassignment in that batch. Currently during this,  Kafka does
> > not
> > > > > > > allow cancelling the pending reassignments cleanly.  Even you
> > have
> > > > the
> > > > > > > json with the "original" assignments to rollback, it has to
> wait
> > > till
> > > > > > > current reassignment to complete, then submit it as
> reassignments
> > > to
> > > > > > > rollback. If the current reassignment is causing impact to
> > > > production,
> > > > > > > we would like the reassignments to be cancelled/rollbacked
> > > > > > > cleanly/safely/quickly.  This is the main goal of KIP-236.
> > > > > > >
> > > > > > > The original KIP-236 by Tom Bentley also proposed the
> incremental
> > > > > > > reassignments, to submit new reassignments while the current
> > > > > > > reassignments is still going on. This is scaled back to put
> under
> > > > > > > "Planned Future Changes" section of KIP-236, so we can expedite
> > > this
> > > > > > > Reassignment Cancellation/Rollback feature out to the
> community.
> > > > > > >
> > > > > > > The main idea incremental reassignment is to allow submit new
> > > > > > > reassignments in another ZK node
> /admin/reassign_partitions_queue
> > > > and
> > > > > > > merge it with current pending reassignments in
> > > > > > > /admin/reassign_partitions.  In case of same topic/partition in
> > > both
> > > > > ZK
> > > > > > > node, the conflict resolution is to cancel the current
> > reassignment
> > > > in
> > > > > > > /admin/reassign_partitions, and move the same topic/partition
> > > > > > > from /admin/reassign_partitions_queue  as new reassignment.
> > > > > > >
> > > > > > > If there is enough interest from the community, this "Planned
> > > Future
> > > > > > > Changes" for incremental reassignments can also be delivered in
> > > > > > > KIP-236, otherwise, another KIP.  The current
> > > > > > > PR:  https://github.com/apache/kafka/pull/6296  only
> > > > > focuses/addresses
> > > > > > > the pending Reassignment Cancellation/Rollback.
> > > > > > >
> > > > > > > Hope this answers your questions.
> > > > > > >
> > > > > > > Thanks,George
> > > > > > >
> > > > > > >    On Friday, February 22, 2019, 6:51:14 AM PST, Viktor
> > > Somogyi-Vass
> > > > > > > <vi...@gmail.com> wrote:
> > > > > > >
> > > > > > >  Read through the KIP and I have one comment:
> > > > > > >
> > > > > > > It seems like it is not looking strictly for cancellation but
> > also
> > > > > > > implements rolling back to the original. I think it'd be much
> > > simpler
> > > > > to
> > > > > > > generate a reassignment json on cancellation that contains the
> > > > original
> > > > > > > assignment and start a new partition reassignment completely.
> > This
> > > > way
> > > > > the
> > > > > > > reassignment algorithm (whatever it is) could be reused as a
> > whole.
> > > > > Did you
> > > > > > > consider this or are there any obstacles that prevents doing
> > this?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Viktor
> > > > > > >
> > > > > > > On Fri, Feb 22, 2019 at 2:24 PM Viktor Somogyi-Vass <
> > > > > viktorsomogyi@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > I've published the above mentioned KIP here:
> > > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-435%3A+Incremental+Partition+Reassignment
> > > > > > > > Will start a discussion about it soon.
> > > > > > > >
> > > > > > > > On Fri, Feb 22, 2019 at 12:45 PM Viktor Somogyi-Vass <
> > > > > > > > viktorsomogyi@gmail.com> wrote:
> > > > > > > >
> > > > > > > >> Hi Folks,
> > > > > > > >>
> > > > > > > >> I also have a pending active work on the incremental
> partition
> > > > > > > >> reassignment stuff here:
> > > > > https://issues.apache.org/jira/browse/KAFKA-6794
> > > > > > > >> I think it would be good to cooperate on this to make both
> > work
> > > > > > > >> compatible with each other.
> > > > > > > >>
> > > > > > > >> I'll write up a KIP about this today so it'll be easier to
> see
> > > how
> > > > > to fit
> > > > > > > >> the two together. Basically in my work I operate on the
> > > > > > > >> /admin/reassign_partitions node on a fully compatible way,
> > > meaning
> > > > > I won't
> > > > > > > >> change it just calculate each increment based on that and
> the
> > > > > current state
> > > > > > > >> of the ISR set for the partition in reassignment.
> > > > > > > >> I hope we could collaborate on this.
> > > > > > > >>
> > > > > > > >> Viktor
> > > > > > > >>
> > > > > > > >> On Thu, Feb 21, 2019 at 9:04 PM Harsha <ka...@harsha.io>
> > wrote:
> > > > > > > >>
> > > > > > > >>> Thanks George. LGTM.
> > > > > > > >>> Jun & Tom, Can you please take a look at the updated KIP.
> > > > > > > >>> Thanks,
> > > > > > > >>> Harsha
> > > > > > > >>>
> > > > > > > >>> On Wed, Feb 20, 2019, at 12:18 PM, George Li wrote:
> > > > > > > >>> > Hi,
> > > > > > > >>> >
> > > > > > > >>> > After discussing with Tom, Harsha and I are picking up
> > > KIP-236
> > > > <
> > > > > > > >>>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment
> > > > > >.
> > > > > > > >>> The work focused on safely/cleanly cancel / rollback
> pending
> > > > > reassignments
> > > > > > > >>> in a timely fashion. Pull Request #6296 <
> > > > > > > >>> https://github.com/apache/kafka/pull/6296> Still working
> on
> > > more
> > > > > > > >>> integration/system tests.
> > > > > > > >>> >
> > > > > > > >>> > Please review and provide feedbacks/suggestions.
> > > > > > > >>> >
> > > > > > > >>> > Thanks,
> > > > > > > >>> > George
> > > > > > > >>> >
> > > > > > > >>> >
> > > > > > > >>> > On Saturday, December 23, 2017, 0:51:13 GMT, Jun Rao <
> > > > > jun@confluent.io>
> > > > > > > >>> wrote:
> > > > > > > >>> >
> > > > > > > >>> > Hi, Tom,
> > > > > > > >>>
> > > > > > > >>> Thanks for the reply.
> > > > > > > >>>
> > > > > > > >>> 10. That's a good thought. Perhaps it's better to get rid
> of
> > > > > > > >>> /admin/reassignment_requests
> > > > > > > >>> too. The window when a controller is not available is
> small.
> > > So,
> > > > > we can
> > > > > > > >>> just failed the admin client if the controller is not
> > reachable
> > > > > after the
> > > > > > > >>> timeout.
> > > > > > > >>>
> > > > > > > >>> 13. With the changes in 10, the old approach is handled
> > through
> > > > ZK
> > > > > > > >>> callback
> > > > > > > >>> and the new approach is through Kafka RP The ordering
> between
> > > the
> > > > > two
> > > > > > > >>> is
> > > > > > > >>> kind of arbitrary. Perhaps the ordering can just be based
> on
> > > the
> > > > > order
> > > > > > > >>> that
> > > > > > > >>> the reassignment is added to the controller request queue.
> > From
> > > > > there, we
> > > > > > > >>> can either do the overriding or the prevention.
> > > > > > > >>>
> > > > > > > >>> Jun
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> On Fri, Dec 22, 2017 at 7:31 AM, Tom Bentley <
> > > > > t.j.bentley@gmail.com>
> > > > > > > >>> wrote:
> > > > > > > >>>
> > > > > > > >>> > Hi Jun,
> > > > > > > >>> >
> > > > > > > >>> > Thanks for responding, my replies are inline:
> > > > > > > >>> >
> > > > > > > >>> > 10. You explanation makes sense. My remaining concern is
> > the
> > > > > > > >>> additional ZK
> > > > > > > >>> > > writes in the proposal. With the proposal, we will need
> > to
> > > do
> > > > > > > >>> following
> > > > > > > >>> > > writes in ZK.
> > > > > > > >>> > >
> > > > > > > >>> > > a. write new assignment in /admin/reassignment_requests
> > > > > > > >>> > >
> > > > > > > >>> > > b. write new assignment and additional metadata in
> > > > > > > >>> > > /admin/reassignments/$topic/$partition
> > > > > > > >>> > >
> > > > > > > >>> > > c. write old + new assignment  in
> /brokers/topics/[topic]
> > > > > > > >>> > >
> > > > > > > >>> > > d. write new assignment in /brokers/topics/[topic]
> > > > > > > >>> > >
> > > > > > > >>> > > e. delete /admin/reassignments/$topic/$partition
> > > > > > > >>> > >
> > > > > > > >>> > > So, there are quite a few ZK writes. I am wondering if
> > it's
> > > > > better to
> > > > > > > >>> > > consolidate the info in
> > > > /admin/reassignments/$topic/$partition
> > > > > into
> > > > > > > >>> > > /brokers/topics/[topic].
> > > > > > > >>> > > For example, we can just add some new JSON fields in
> > > > > > > >>> > > /brokers/topics/[topic]
> > > > > > > >>> > > to remember the new assignment and potentially the
> > original
> > > > > replica
> > > > > > > >>> count
> > > > > > > >>> > > when doing step c. Those fields with then be removed in
> > > step
> > > > > d. That
> > > > > > > >>> way,
> > > > > > > >>> > > we can get rid of step b and e, saving 2 ZK writes per
> > > > > partition.
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>> > This seems like a great idea to me.
> > > > > > > >>> >
> > > > > > > >>> > It might also be possible to get rid of the
> > > > > > > >>> /admin/reassignment_requests
> > > > > > > >>> > subtree too. I've not yet published the ideas I have for
> > the
> > > > > > > >>> AdminClient
> > > > > > > >>> > API for reassigning partitions, but given the existence
> of
> > > such
> > > > > an
> > > > > > > >>> API, the
> > > > > > > >>> > route to starting a reassignment would be the
> AdminClient,
> > > and
> > > > > not
> > > > > > > >>> > zookeeper. In that case there is no need for
> > > > > > > >>> /admin/reassignment_requests
> > > > > > > >>> > at all. The only drawback that I can see is that while
> it's
> > > > > currently
> > > > > > > >>> > possible to trigger a reassignment even during a
> controller
> > > > > > > >>> > election/failover that would no longer be the case if all
> > > > > requests had
> > > > > > > >>> to
> > > > > > > >>> > go via the controller.
> > > > > > > >>> >
> > > > > > > >>> >
> > > > > > > >>> > > 11. What you described sounds good. We could
> potentially
> > > > > optimize the
> > > > > > > >>> > > dropped replicas a bit more. Suppose that assignment
> > > [0,1,2]
> > > > > is first
> > > > > > > >>> > > changed to [1,2,3] and then to [2,3,4]. When initiating
> > the
> > > > > second
> > > > > > > >>> > > assignment, we may end up dropping replica 3 and only
> to
> > > > > restart it
> > > > > > > >>> > again.
> > > > > > > >>> > > In this case, we could only drop a replica if it's not
> > > going
> > > > > to be
> > > > > > > >>> added
> > > > > > > >>> > > back again.
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>> > I had missed that, thank you! I will update the proposed
> > > > > algorithm to
> > > > > > > >>> > prevent this.
> > > > > > > >>> >
> > > > > > > >>> >
> > > > > > > >>> > > 13. Since this is a corner case, we can either prevent
> or
> > > > allow
> > > > > > > >>> > overriding
> > > > > > > >>> > > with old/new mechanisms. To me, it seems that allowing
> is
> > > > > simpler to
> > > > > > > >>> > > implement, the order in /admin/reassignment_requests
> > > > > determines the
> > > > > > > >>> > > ordering the of override, whether that's initiated by
> the
> > > new
> > > > > way or
> > > > > > > >>> the
> > > > > > > >>> > > old way.
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>> > That makes sense except for the corner case where:
> > > > > > > >>> >
> > > > > > > >>> > * There is no current controller and
> > > > > > > >>> > * Writes to both the new and old znodes happen
> > > > > > > >>> >
> > > > > > > >>> > On election of the new controller, for those partitions
> > with
> > > > > both a
> > > > > > > >>> > reassignment_request and in /admin/reassign_partitions,
> we
> > > have
> > > > > to
> > > > > > > >>> decide
> > > > > > > >>> > which should win. You could use the modification time,
> > though
> > > > > there are
> > > > > > > >>> > some very unlikely scenarios where that doesn't work
> > > properly,
> > > > > for
> > > > > > > >>> example
> > > > > > > >>> > if both znodes have the same mtime, or the
> > > > > /admin/reassign_partitions
> > > > > > > >>> was
> > > > > > > >>> > updated, but the assignment of the partition wasn't
> > changed,
> > > > > like this:
> > > > > > > >>> >
> > > > > > > >>> > 0. /admin/reassign_partitions has my-topic/42 = [1,2,3]
> > > > > > > >>> > 1. Controller stops watching.
> > > > > > > >>> > 2. Create /admin/reassignment_requests/request_1234 to
> > change
> > > > the
> > > > > > > >>> > reassignment of partition my-topic/42 = [4,5,6]
> > > > > > > >>> > 3. Update /admin/reassign_partitions to add
> > > > your-topic/12=[7,8,9]
> > > > > > > >>> > 4. New controller resumes
> > > > > > > >>> >
> > > > > > > >>> >
> > > > > > > >>> >
> > > > > > > >>> > > Thanks,
> > > > > > > >>> > >
> > > > > > > >>> > > Jun
> > > > > > > >>> > >
> > > > > > > >>> > > On Tue, Dec 19, 2017 at 2:43 AM, Tom Bentley <
> > > > > t.j.bentley@gmail.com>
> > > > > > > >>> > > wrote:
> > > > > > > >>> > >
> > > > > > > >>> > > > Hi Jun,
> > > > > > > >>> > > >
> > > > > > > >>> > > > 10. Another concern of mine is on consistency with
> the
> > > > > current
> > > > > > > >>> pattern.
> > > > > > > >>> > > The
> > > > > > > >>> > > > > current pattern for change notification based on ZK
> > is
> > > > (1)
> > > > > we
> > > > > > > >>> first
> > > > > > > >>> > > write
> > > > > > > >>> > > > > the actual value in the entity path and then write
> > the
> > > > > change
> > > > > > > >>> > > > notification
> > > > > > > >>> > > > > path, and (2)  the change notification path only
> > > includes
> > > > > what
> > > > > > > >>> entity
> > > > > > > >>> > > has
> > > > > > > >>> > > > > changed but not the actual changes. If we want to
> > > follow
> > > > > this
> > > > > > > >>> pattern
> > > > > > > >>> > > for
> > > > > > > >>> > > > > consistency,
> /admin/reassignment_requests/request_xxx
> > > > will
> > > > > only
> > > > > > > >>> have
> > > > > > > >>> > > the
> > > > > > > >>> > > > > partitions whose reassignment have changed, but not
> > the
> > > > > actual
> > > > > > > >>> > > > > reassignment.
> > > > > > > >>> > > > >
> > > > > > > >>> > > >
> > > > > > > >>> > > > Ah, I hadn't understood part (2). That means my
> concern
> > > > about
> > > > > > > >>> > efficiency
> > > > > > > >>> > > > with the current pattern is misplaced. There are
> still
> > > some
> > > > > > > >>> interesting
> > > > > > > >>> > > > differences in semantics, however:
> > > > > > > >>> > > >
> > > > > > > >>> > > > a) The mechanism currently proposed in KIP-236 means
> > that
> > > > the
> > > > > > > >>> > controller
> > > > > > > >>> > > is
> > > > > > > >>> > > > the only writer to /admin/reassignments. This means
> it
> > > can
> > > > > include
> > > > > > > >>> > > > information in these znodes that requesters might not
> > > know,
> > > > > or
> > > > > > > >>> > > information
> > > > > > > >>> > > > that's necessary to perform the reassignment but not
> > > > > necessary to
> > > > > > > >>> > > describe
> > > > > > > >>> > > > the request. While this could be handled using the
> > > current
> > > > > pattern
> > > > > > > >>> it
> > > > > > > >>> > > would
> > > > > > > >>> > > > rely on all  writers to preserve any information
> added
> > by
> > > > the
> > > > > > > >>> > controller,
> > > > > > > >>> > > > which seems complicated and hence fragile.
> > > > > > > >>> > > >
> > > > > > > >>> > > > b) The current pattern for change notification
> doesn't
> > > cope
> > > > > with
> > > > > > > >>> > > competing
> > > > > > > >>> > > > writers to the entity path: If two processes write to
> > the
> > > > > entity
> > > > > > > >>> path
> > > > > > > >>> > > > before the controller can read it (due to
> notification)
> > > > then
> > > > > one
> > > > > > > >>> set of
> > > > > > > >>> > > > updates will be lost.
> > > > > > > >>> > > >
> > > > > > > >>> > > > c) If a single writing process crashes after writing
> to
> > > the
> > > > > entity
> > > > > > > >>> > path,
> > > > > > > >>> > > > but before writing to the notification path then the
> > > write
> > > > > will be
> > > > > > > >>> > lost.
> > > > > > > >>> > > >
> > > > > > > >>> > > > I'm actually using point a) in my WIP (see below).
> > Points
> > > > b)
> > > > > and
> > > > > > > >>> c) are
> > > > > > > >>> > > > obviously edge cases.
> > > > > > > >>> > > >
> > > > > > > >>> > > >
> > > > > > > >>> > > > > 11. Ok. I am not sure that I fully understand the
> > > > > description of
> > > > > > > >>> that
> > > > > > > >>> > > > part.
> > > > > > > >>> > > > > Does "assigned" refer to the current assignment?
> > Could
> > > > you
> > > > > also
> > > > > > > >>> > > describe
> > > > > > > >>> > > > > where the length of the original assignment is
> stored
> > > in
> > > > > ZK?
> > > > > > > >>> > > > >
> > > > > > > >>> > > >
> > > > > > > >>> > > > Sorry if the description is not clear. Yes,
> "assigned"
> > > > > referrs to
> > > > > > > >>> the
> > > > > > > >>> > > > currently assigned replicas (taken from the
> > > > > > > >>> > > > ControllerContext.partitionReplicaAssignment). I
> would
> > > > store
> > > > > the
> > > > > > > >>> > length
> > > > > > > >>> > > of
> > > > > > > >>> > > > the original assignment in the
> > > > > > > >>> /admin/reassignments/$topic/$partition
> > > > > > > >>> > > > znode
> > > > > > > >>> > > > (this is where the point (a) above is useful -- the
> > > > requester
> > > > > > > >>> shouldn't
> > > > > > > >>> > > > know that this information is used by the
> controller).
> > > > > > > >>> > > >
> > > > > > > >>> > > > I've updated the KIP to make these points clearer.
> > > > > > > >>> > > >
> > > > > > > >>> > > >
> > > > > > > >>> > > > > 13. Hmm, I am not sure that the cancellation needs
> to
> > > be
> > > > > done
> > > > > > > >>> for the
> > > > > > > >>> > > > whole
> > > > > > > >>> > > > > batch. The reason that I brought this up is for
> > > > > consistency. The
> > > > > > > >>> KIP
> > > > > > > >>> > > > allows
> > > > > > > >>> > > > > override when using the new approach. It just seems
> > > that
> > > > > it's
> > > > > > > >>> simpler
> > > > > > > >>> > > to
> > > > > > > >>> > > > > extend this model when resolving multiple changes
> > > between
> > > > > the
> > > > > > > >>> old and
> > > > > > > >>> > > the
> > > > > > > >>> > > > > new approach.
> > > > > > > >>> > > >
> > > > > > > >>> > > >
> > > > > > > >>> > > > Ah, I think I've been unclear on this point too.
> > > Currently
> > > > > the
> > > > > > > >>> > > > ReassignPartitionsCommand enforces that you can't
> > change
> > > > > > > >>> reassignments,
> > > > > > > >>> > > but
> > > > > > > >>> > > > this doesn't stop other ZK clients making changes to
> > > > > > > >>> > > > /admin/reassign_partitions directly and I believe
> some
> > > > Kafka
> > > > > users
> > > > > > > >>> do
> > > > > > > >>> > > > indeed change reassignments in-flight by writing to
> > > > > > > >>> > > > /admin/reassign_partitions. What I'm proposing
> doesn't
> > > > break
> > > > > that
> > > > > > > >>> at
> > > > > > > >>> > all.
> > > > > > > >>> > > > The semantic I've implemented is only that the
> > controller
> > > > > only
> > > > > > > >>> refuses
> > > > > > > >>> > a
> > > > > > > >>> > > > reassignment change if there is already one in-flight
> > > (i.e.
> > > > > in
> > > > > > > >>> > > > /admin/reassignments/$topic/$partition) **via the
> other
> > > > > > > >>> mechansim**.
> > > > > > > >>> > So
> > > > > > > >>> > > if
> > > > > > > >>> > > > you're using /admin/reassign_partitions and you
> change
> > or
> > > > > cancel
> > > > > > > >>> part
> > > > > > > >>> > of
> > > > > > > >>> > > it
> > > > > > > >>> > > > via /admin/reassign_partitions, that's OK. Likewise
> if
> > > > > you're using
> > > > > > > >>> > > > /admin/reassignment_request/request_xxx and you
> change
> > or
> > > > > cancel
> > > > > > > >>> part
> > > > > > > >>> > of
> > > > > > > >>> > > > it
> > > > > > > >>> > > > via another /admin/reassignment_request/request_xxx,
> > > that's
> > > > > OK.What
> > > > > > > >>> > you
> > > > > > > >>> > > > can't do is change a request that was started via
> > > > > > > >>> > > > /admin/reassign_partitions via
> > > /admin/reassignment_request/
> > > > > > > >>> > request_xxx,
> > > > > > > >>> > > or
> > > > > > > >>> > > > vice versa.
> > > > > > > >>> > > >
> > > > > > > >>> > > > What I was thinking of when I replied is the case
> > where,
> > > on
> > > > > > > >>> controller
> > > > > > > >>> > > > failover, /admin/reassign_partitions has been changed
> > and
> > > > > > > >>> > > > /admin/reassignment_request/request_xxx created (in
> the
> > > > > period when
> > > > > > > >>> > the
> > > > > > > >>> > > > new
> > > > > > > >>> > > > controller was being elected, for example) with a
> > common
> > > > > > > >>> partition. In
> > > > > > > >>> > > this
> > > > > > > >>> > > > case we should apply a consistent rule to that used
> > when
> > > > the
> > > > > > > >>> > notification
> > > > > > > >>> > > > happen in real time. Your suggestion to use the
> > > > modification
> > > > > time
> > > > > > > >>> of
> > > > > > > >>> > the
> > > > > > > >>> > > > znode would work here too (except in the edge case
> > where
> > > ZK
> > > > > writes
> > > > > > > >>> to
> > > > > > > >>> > > both
> > > > > > > >>> > > > znodes happen within the same clock tick on the ZK
> > > server,
> > > > > so the
> > > > > > > >>> > mtimes
> > > > > > > >>> > > > are the same).
> > > > > > > >>> > > >
> > > > > > > >>> > > > Let me know if you think this is the right semantic
> and
> > > > I'll
> > > > > try to
> > > > > > > >>> > > clarify
> > > > > > > >>> > > > the KIP.
> > > > > > > >>> > > >
> > > > > > > >>> > > > Many thanks,
> > > > > > > >>> > > >
> > > > > > > >>> > > > Tom
> > > > > > > >>> > > >
> > > > > > > >>> > > > On 18 December 2017 at 18:12, Jun Rao <
> > jun@confluent.io>
> > > > > wrote:
> > > > > > > >>> > > >
> > > > > > > >>> > > > > Hi, Tom,
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > Thanks for the reply. A few more followup comments
> > > below.
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > 10. Another concern of mine is on consistency with
> > the
> > > > > current
> > > > > > > >>> > pattern.
> > > > > > > >>> > > > The
> > > > > > > >>> > > > > current pattern for change notification based on ZK
> > is
> > > > (1)
> > > > > we
> > > > > > > >>> first
> > > > > > > >>> > > write
> > > > > > > >>> > > > > the actual value in the entity path and then write
> > the
> > > > > change
> > > > > > > >>> > > > notification
> > > > > > > >>> > > > > path, and (2)  the change notification path only
> > > includes
> > > > > what
> > > > > > > >>> entity
> > > > > > > >>> > > has
> > > > > > > >>> > > > > changed but not the actual changes. If we want to
> > > follow
> > > > > this
> > > > > > > >>> pattern
> > > > > > > >>> > > for
> > > > > > > >>> > > > > consistency,
> /admin/reassignment_requests/request_xxx
> > > > will
> > > > > only
> > > > > > > >>> have
> > > > > > > >>> > > the
> > > > > > > >>> > > > > partitions whose reassignment have changed, but not
> > the
> > > > > actual
> > > > > > > >>> > > > > reassignment.
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > 11. Ok. I am not sure that I fully understand the
> > > > > description of
> > > > > > > >>> that
> > > > > > > >>> > > > part.
> > > > > > > >>> > > > > Does "assigned" refer to the current assignment?
> > Could
> > > > you
> > > > > also
> > > > > > > >>> > > describe
> > > > > > > >>> > > > > where the length of the original assignment is
> stored
> > > in
> > > > > ZK?
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > 13. Hmm, I am not sure that the cancellation needs
> to
> > > be
> > > > > done
> > > > > > > >>> for the
> > > > > > > >>> > > > whole
> > > > > > > >>> > > > > batch. The reason that I brought this up is for
> > > > > consistency. The
> > > > > > > >>> KIP
> > > > > > > >>> > > > allows
> > > > > > > >>> > > > > override when using the new approach. It just seems
> > > that
> > > > > it's
> > > > > > > >>> simpler
> > > > > > > >>> > > to
> > > > > > > >>> > > > > extend this model when resolving multiple changes
> > > between
> > > > > the
> > > > > > > >>> old and
> > > > > > > >>> > > the
> > > > > > > >>> > > > > new approach.
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > Jun
> > > > > > > >>> > > > >
> > > > > > > >>> > > > >
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > On Mon, Dec 18, 2017 at 2:45 AM, Tom Bentley <
> > > > > > > >>> t.j.bentley@gmail.com>
> > > > > > > >>> > > > > wrote:
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > > Hi Jun,
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > Thanks for replying, some answers below:
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > > 10. The proposal now stores the reassignment
> for
> > > all
> > > > > > > >>> partitions
> > > > > > > >>> > in
> > > > > > > >>> > > > > > > /admin/reassignment_requests/request_xxx. If
> the
> > > > > number of
> > > > > > > >>> > > > reassigned
> > > > > > > >>> > > > > > > partitions is larger, the ZK write may hit the
> > > > default
> > > > > 1MB
> > > > > > > >>> limit
> > > > > > > >>> > > and
> > > > > > > >>> > > > > > fail.
> > > > > > > >>> > > > > > > An alternative approach is to have the
> > reassignment
> > > > > requester
> > > > > > > >>> > first
> > > > > > > >>> > > > > write
> > > > > > > >>> > > > > > > the new assignment for each partition under
> > > > > > > >>> > > > > > > /admin/reassignments/$topic/$partition and then
> > > write
> > > > > > > >>> > > > > > > /admin/reassignment_requests/request_xxx with
> an
> > > > empty
> > > > > value.
> > > > > > > >>> > The
> > > > > > > >>> > > > > > > controller can then read all values under
> > > > > > > >>> /admin/reassignments.
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > You're right that reassigning enough partitions
> > would
> > > > > hit the
> > > > > > > >>> 1MB
> > > > > > > >>> > > > limit,
> > > > > > > >>> > > > > > but I don't think this would be a problem in
> > practice
> > > > > because
> > > > > > > >>> it
> > > > > > > >>> > > would
> > > > > > > >>> > > > be
> > > > > > > >>> > > > > > trivial to split the partitions into several
> > requests
> > > > > (i.e.
> > > > > > > >>> > mutleiple
> > > > > > > >>> > > > > > request_xxx).
> > > > > > > >>> > > > > > I don't think the non-atomicity this would imply
> > is a
> > > > > problem.
> > > > > > > >>> By
> > > > > > > >>> > > > writing
> > > > > > > >>> > > > > > the partitions whose
> > > > > /admin/reassignments/$topic/$partition has
> > > > > > > >>> > been
> > > > > > > >>> > > > > > created or changed it makes it much more
> efficient
> > to
> > > > > know
> > > > > > > >>> which of
> > > > > > > >>> > > > those
> > > > > > > >>> > > > > > znodes we need to read. If I understand your
> > > > suggestion,
> > > > > you
> > > > > > > >>> would
> > > > > > > >>> > > have
> > > > > > > >>> > > > > to
> > > > > > > >>> > > > > > read every node under /admin/reassignments to
> > figure
> > > > out
> > > > > which
> > > > > > > >>> had
> > > > > > > >>> > > > > changed.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > > 11. The improvement you suggested in
> > > > > > > >>> onPartitionReassignment()
> > > > > > > >>> > > sounds
> > > > > > > >>> > > > > > good
> > > > > > > >>> > > > > > > at the high level. The computation of those
> > dropped
> > > > > > > >>> partitions
> > > > > > > >>> > > seems
> > > > > > > >>> > > > a
> > > > > > > >>> > > > > > bit
> > > > > > > >>> > > > > > > complicated. Perhaps a simple approach is to
> drop
> > > the
> > > > > > > >>> replicas
> > > > > > > >>> > not
> > > > > > > >>> > > in
> > > > > > > >>> > > > > the
> > > > > > > >>> > > > > > > original assignment and newest reassignment?
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > This was what I came up with originally too, but
> > > when I
> > > > > looked
> > > > > > > >>> into
> > > > > > > >>> > > > > > implementing it I found a couple of things which
> > made
> > > > me
> > > > > > > >>> reconsider
> > > > > > > >>> > > it.
> > > > > > > >>> > > > > > Consider the reassignments [0,1] -> [2,3] ->
> [3,4].
> > > In
> > > > > words:
> > > > > > > >>> we
> > > > > > > >>> > > start
> > > > > > > >>> > > > > > reassigning to [2,3], but then change our minds
> > > about 2
> > > > > and
> > > > > > > >>> switch
> > > > > > > >>> > it
> > > > > > > >>> > > > to
> > > > > > > >>> > > > > 4
> > > > > > > >>> > > > > > (maybe we've figured out a better overall
> balance).
> > > At
> > > > > that
> > > > > > > >>> point
> > > > > > > >>> > it
> > > > > > > >>> > > is
> > > > > > > >>> > > > > > perfectly possible that broker 2 is in-sync and
> > > broker
> > > > 1
> > > > > is not
> > > > > > > >>> > > > in-sync.
> > > > > > > >>> > > > > It
> > > > > > > >>> > > > > > seems silly to drop broker 2 in favour of broker
> 1:
> > > > We're
> > > > > > > >>> > needlessly
> > > > > > > >>> > > > > giving
> > > > > > > >>> > > > > > the cluster more work to do.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > The second thing that made me reconsider was in
> > that
> > > > same
> > > > > > > >>> scenario
> > > > > > > >>> > > it's
> > > > > > > >>> > > > > > even possible that broker 2 is the leader of the
> > > > > partition.
> > > > > > > >>> > Obviously
> > > > > > > >>> > > > we
> > > > > > > >>> > > > > > can elect a new leader before dropping it, but
> not
> > > > > without
> > > > > > > >>> causing
> > > > > > > >>> > > > > > disruption to producers and consumers.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > By accepting a little more complexity in choosing
> > > which
> > > > > > > >>> brokers to
> > > > > > > >>> > > drop
> > > > > > > >>> > > > > we
> > > > > > > >>> > > > > > make the dropping simpler (no need for leader
> > > election)
> > > > > and
> > > > > > > >>> ensure
> > > > > > > >>> > > the
> > > > > > > >>> > > > > > cluster has less work to do.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > > 12. You brought up the need of remembering the
> > > > original
> > > > > > > >>> > assignment.
> > > > > > > >>> > > > > This
> > > > > > > >>> > > > > > > will be lost if the assignment is changed
> > multiple
> > > > > times if
> > > > > > > >>> we
> > > > > > > >>> > > follow
> > > > > > > >>> > > > > the
> > > > > > > >>> > > > > > > approach described in 10. One way is to store
> the
> > > > > original
> > > > > > > >>> > > assignment
> > > > > > > >>> > > > > in
> > > > > > > >>> > > > > > > /brokers/topics/[topic] as the following. When
> > the
> > > > > final
> > > > > > > >>> > > reassignment
> > > > > > > >>> > > > > > > completes, we can remove the original field.
> > > > > > > >>> > > > > > > {
> > > > > > > >>> > > > > > >  "version": 1,
> > > > > > > >>> > > > > > >  "partitions": {"0": [0, 1, 3] },
> > > > > > > >>> > > > > > >  "originals": {"0": [0, 1, 2] }
> > > > > > > >>> > > > > > > }
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > While I was implementing my first version of
> > > > > > > >>> > > onPartitionReassignment(),
> > > > > > > >>> > > > > > where I preferred the originals, I was storing
> the
> > > > > originals
> > > > > > > >>> in the
> > > > > > > >>> > > > > > /admin/reassignments/$topic/$partition znodes.
> > Since
> > > we
> > > > > will
> > > > > > > >>> > remove
> > > > > > > >>> > > > that
> > > > > > > >>> > > > > > znode at the end of reassignment anyway, I would
> > > > suggest
> > > > > this
> > > > > > > >>> is a
> > > > > > > >>> > > > better
> > > > > > > >>> > > > > > place to store that data (if it's necessary to do
> > > so),
> > > > > so that
> > > > > > > >>> we
> > > > > > > >>> > can
> > > > > > > >>> > > > > avoid
> > > > > > > >>> > > > > > another ZK round trip.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > > 13. For resolving the conflict between
> > > > > > > >>> /admin/reassign_partitions
> > > > > > > >>> > > and
> > > > > > > >>> > > > > > > /admin/reassignments/$topic/$partition, perhaps
> > > it's
> > > > > more
> > > > > > > >>> > natural
> > > > > > > >>> > > to
> > > > > > > >>> > > > > > just
> > > > > > > >>> > > > > > > let the assignment with a newer timestamp to
> > > override
> > > > > the
> > > > > > > >>> older
> > > > > > > >>> > > one?
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > That would work but with slightly different
> > semantics
> > > > to
> > > > > what I
> > > > > > > >>> > have:
> > > > > > > >>> > > > > Since
> > > > > > > >>> > > > > > /admin/reassign_partitions contains multiple
> > > > partitions,
> > > > > using
> > > > > > > >>> the
> > > > > > > >>> > > > > > timestamp means the whole batch wins or losses.
> By
> > > > > tracking how
> > > > > > > >>> > each
> > > > > > > >>> > > > > > request was made we can be more fine-grained. I'm
> > to
> > > > use
> > > > > the
> > > > > > > >>> > > > modification
> > > > > > > >>> > > > > > time if such granularity is not required.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > > 14. Implementation wise, currently, we
> register a
> > > > > watcher of
> > > > > > > >>> the
> > > > > > > >>> > > isr
> > > > > > > >>> > > > > path
> > > > > > > >>> > > > > > > of each partition being reassigned. This has
> the
> > > > > potential
> > > > > > > >>> issue
> > > > > > > >>> > of
> > > > > > > >>> > > > > > > registering many listeners. An improvement
> could
> > be
> > > > > just
> > > > > > > >>> > > piggybacking
> > > > > > > >>> > > > > on
> > > > > > > >>> > > > > > > the existing IsrChangeNotificationHandler,
> which
> > > only
> > > > > > > >>> watches a
> > > > > > > >>> > > > single
> > > > > > > >>> > > > > ZK
> > > > > > > >>> > > > > > > path and is triggered on a batch of isr
> changes.
> > > This
> > > > > is
> > > > > > > >>> kind of
> > > > > > > >>> > > > > > orthogonal
> > > > > > > >>> > > > > > > to the KIP. However, if we are touching the
> > > > > reassignment
> > > > > > > >>> logic,
> > > > > > > >>> > it
> > > > > > > >>> > > > may
> > > > > > > >>> > > > > be
> > > > > > > >>> > > > > > > worth considering.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > Let me look into that.
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > Thanks,
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > Tom
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > On 16 December 2017 at 02:19, Jun Rao <
> > > > jun@confluent.io>
> > > > > > > >>> wrote:
> > > > > > > >>> > > > > >
> > > > > > > >>> > > > > > > Hi, Tom,
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > > > Thanks for the updated KIP. A few more comments
> > > > below.
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > > > 10. The proposal now stores the reassignment
> for
> > > all
> > > > > > > >>> partitions
> > > > > > > >>> > in
> > > > > > > >>> > > > > > > /admin/reassignment_requests/request_xxx. If
> the
> > > > > number of
> > > > > > > >>> > > > reassigned
> > > > > > > >>> > > > > > > partitions is larger, the ZK write may hit the
> > > > default
> > > > > 1MB
> > > > > > > >>> limit
> > > > > > > >>> > > and
> > > > > > > >>> > > > > > fail.
> > > > > > > >>> > > > > > > An alternative approach is to have the
> > reassignment
> > > > > requester
> > > > > > > >>> > first
> > > > > > > >>> > > > > write
> > > > > > > >>> > > > > > > the new assignment for each partition under
> > > > > > > >>> > > > > > > /admin/reassignments/$topic/$partition and then
> > > write
> > > > > > > >>> > > > > > > /admin/reassignment_requests/request_xxx with
> an
> > > > empty
> > > > > value.
> > > > > > > >>> > The
> > > > > > > >>> > > > > > > controller can then read all values under
> > > > > > > >>> /admin/reassignments.
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > > > 11. The improvement you suggested in
> > > > > > > >>> onPartitionReassignment()
> > > > > > > >>> > > sounds
> > > > > > > >>> > > > > > good
> > > > > > > >>> > > > > > > at the high level. The computation of those
> > dropped
> > > > > > > >>> partitions
> > > > > > > >>> > > seems
> > > > > > > >>> > > > a
> > > > > > > >>> > > > > > bit
> > > > > > > >>> > > > > > > complicated. Perhaps a simple approach is to
> drop
> > > the
> > > > > > > >>> replicas
> > > > > > > >>> > not
> > > > > > > >>> > > in
> > > > > > > >>> > > > > the
> > > > > > > >>> > > > > > > original assignment and newest reassignment?
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > > > 12. You brought up the need of remembering the
> > > > original
> > > > > > > >>> > assignment.
> > > > > > > >>> > > > > This
> > > > > > > >>> > > > > > > will be lost if the assignment is changed
> > multiple
> > > > > times if
> > > > > > > >>> we
> > > > > > > >>> > > follow
> > > > > > > >>> > > > > the
> > > > > > > >>> > > > > > > approach described in 10. One way is to store
> the
> > > > > original
> > > > > > > >>> > > assignment
> > > > > > > >>> > > > > in
> > > > > > > >>> > > > > > > /brokers/topics/[topic] as the following. When
> > the
> > > > > final
> > > > > > > >>> > > reassignment
> > > > > > > >>> > > > > > > completes, we can remove the original field.
> > > > > > > >>> > > > > > > {
> > > > > > > >>> > > > > > >  "version": 1,
> > > > > > > >>> > > > > > >  "partitions": {"0": [0, 1, 3] },
> > > > > > > >>> > > > > > >  "originals": {"0": [0, 1, 2] }
> > > > > > > >>> > > > > > > }
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > > > 13. For resolving the conflict between
> > > > > > > >>> /admin/reassign_partitions
> > > > > > > >>> > > and
> > > > > > > >>> > > > > > > /admin/reassignments/$topic/$partition, perhaps
> > > it's
> > > > > more
> > > > > > > >>> > natural
> > > > > > > >>> > > to
> > > > > > > >>> > > > > > just
> > > > > > > >>> > > > > > > let the assignment with a newer timestamp to
> > > override
> > > > > the
> > > > > > > >>> older
> > > > > > > >>> > > one?
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > > > 14. Implementation wise, currently, we
> register a
> > > > > watcher of
> > > > > > > >>> the
> > > > > > > >>> > > isr
> > > > > > > >>> > > > > path
> > > > > > > >>> > > > > > > of each partition being reassigned. This has
> the
> > > > > potential
> > > > > > > >>> issue
> > > > > > > >>> > of
> > > > > > > >>> > > > > > > registering many listeners. An improvement
> could
> > be
> > > > > just
> > > > > > > >>> > > piggybacking
> > > > > > > >>> > > > > on
> > > > > > > >>> > > > > > > the existing IsrChangeNotificationHandler,
> which
> > > only
> > > > > > > >>> watches a
> > > > > > > >>> > > > single
> > > > > > > >>> > > > > ZK
> > > > > > > >>> > > > > > > path and is triggered on a batch of isr
> changes.
> > > This
> > > > > is
> > > > > > > >>> kind of
> > > > > > > >>> > > > > > orthogonal
> > > > > > > >>> > > > > > > to the KIP. However, if we are touching the
> > > > > reassignment
> > > > > > > >>> logic,
> > > > > > > >>> > it
> > > > > > > >>> > > > may
> > > > > > > >>> > > > > be
> > > > > > > >>> > > > > > > worth considering.
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > > > Thanks,
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > > > Jun
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > > > On Fri, Dec 15, 2017 at 10:17 AM, Tom Bentley <
> > > > > > > >>> > > t.j.bentley@gmail.com
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > > > wrote:
> > > > > > > >>> > > > > > >
> > > > > > > >>> > > > > > > > Just wanted to mention that I've started
> > KIP-240,
> > > > > which
> > > > > > > >>> builds
> > > > > > > >>> > on
> > > > > > > >>> > > > top
> > > > > > > >>> > > > > > of
> > > > > > > >>> > > > > > > > this one to provide an AdminClient API for
> > > listing
> > > > > and
> > > > > > > >>> > describing
> > > > > > > >>> > > > > > > > reassignments.
> > > > > > > >>> > > > > > > >
> > > > > > > >>> > > > > > > > On 15 December 2017 at 14:34, Tom Bentley <
> > > > > > > >>> > t.j.bentley@gmail.com
> > > > > > > >>> > > >
> > > > > > > >>> > > > > > wrote:
> > > > > > > >>> > > > > > > >
> > > > > > > >>> > > > > > > > > > Should we seek to improve this algorithm
> in
> > > > this
> > > > > KIP,
> > > > > > > >>> or
> > > > > > > >>> > > leave
> > > > > > > >>> > > > > that
> > > > > > > >>> > > > > > > as
> > > > > > > >>> > > > > > > > > a later optimisation?
> > > > > > > >>> > > > > > > > >
> > > > > > > >>> > > > > > > > > I've updated the KIP with a proposed
> > algorithm.
> > > > > > > >>> > > > > > > > >
> > > > > > > >>> > > > > > > > >
> > > > > > > >>> > > > > > > > >
> > > > > > > >>> > > > > > > > >
> > > > > > > >>> > > > > > > > >
> > > > > > > >>> > > > > > > > >
> > > > > > > >>> > > > > > > > > On 14 December 2017 at 09:57, Tom Bentley <
> > > > > > > >>> > > t.j.bentley@gmail.com
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > > > wrote:
> > > > > > > >>> > > > > > > > >
> > > > > > > >>> > > > > > > > >> Thanks Ted, now fixed.
> > > > > > > >>> > > > > > > > >>
> > > > > > > >>> > > > > > > > >> On 13 December 2017 at 18:38, Ted Yu <
> > > > > > > >>> yuzhihong@gmail.com>
> > > > > > > >>> > > > wrote:
> > > > > > > >>> > > > > > > > >>
> > > > > > > >>> > > > > > > > >>> Tom:
> > > > > > > >>> > > > > > > > >>> bq. create a znode
> > > > > > > >>> /admin/reassignments/$topic-$partition
> > > > > > > >>> > > > > > > > >>>
> > > > > > > >>> > > > > > > > >>> Looks like the tree structure above
> should
> > > be:
> > > > > > > >>> > > > > > > > >>>
> > > > > > > >>> > > > > > > > >>> /admin/reassignments/$topic/$partition
> > > > > > > >>> > > > > > > > >>>
> > > > > > > >>> > > > > > > > >>> bq. The controller removes
> > > > > /admin/reassignment/$topic/$
> > > > > > > >>> > > > partition
> > > > > > > >>> > > > > > > > >>>
> > > > > > > >>> > > > > > > > >>> Note the lack of 's' for reassignment. It
> > > would
> > > > > be
> > > > > > > >>> good to
> > > > > > > >>> > > make
> > > > > > > >>> > > > > > > > zookeeper
> > > > > > > >>> > > > > > > > >>> paths consistent.
> > > > > > > >>> > > > > > > > >>>
> > > > > > > >>> > > > > > > > >>> Thanks
> > > > > > > >>> > > > > > > > >>>
> > > > > > > >>> > > > > > > > >>> On Wed, Dec 13, 2017 at 9:49 AM, Tom
> > Bentley
> > > <
> > > > > > > >>> > > > > > t.j.bentley@gmail.com>
> > > > > > > >>> > > > > > > > >>> wrote:
> > > > > > > >>> > > > > > > > >>>
> > > > > > > >>> > > > > > > > >>> > Hi Jun and Ted,
> > > > > > > >>> > > > > > > > >>> >
> > > > > > > >>> > > > > > > > >>> > Jun, you're right that needing one
> > watcher
> > > > per
> > > > > > > >>> reassigned
> > > > > > > >>> > > > > > partition
> > > > > > > >>> > > > > > > > >>> > presents a scalability problem, and
> > using a
> > > > > separate
> > > > > > > >>> > > > > notification
> > > > > > > >>> > > > > > > > path
> > > > > > > >>> > > > > > > > >>> > solves that. I also agree that it makes
> > > sense
> > > > > to
> > > > > > > >>> prevent
> > > > > > > >>> > > > users
> > > > > > > >>> > > > > > from
> > > > > > > >>> > > > > > > > >>> using
> > > > > > > >>> > > > > > > > >>> > both methods on the same reassignment.
> > > > > > > >>> > > > > > > > >>> >
> > > > > > > >>> > > > > > > > >>> > Ted, naming the reassignments like
> > > mytopic-42
> > > > > was
> > > > > > > >>> simpler
> > > > > > > >>> > > > > while I
> > > > > > > >>> > > > > > > was
> > > > > > > >>> > > > > > > > >>> > proposing a watcher-per-reassignment
> (I'd
> > > > have
> > > > > > > >>> needed a
> > > > > > > >>> > > child
> > > > > > > >>> > > > > > > watcher
> > > > > > > >>> > > > > > > > >>> on
> > > > > > > >>> > > > > > > > >>> > /admin/reassignments and also on
> > > > > > > >>> > > > /admin/reassignments/mytopic).
> > > > > > > >>> > > > > > > Using
> > > > > > > >>> > > > > > > > >>> the
> > > > > > > >>> > > > > > > > >>> > separate notification path means I
> don't
> > > need
> > > > > any
> > > > > > > >>> > watchers
> > > > > > > >>> > > in
> > > > > > > >>> > > > > the
> > > > > > > >>> > > > > > > > >>> > /admin/reassignments subtree, so
> > switching
> > > to
> > > > > > > >>> > > > > > > > >>> /admin/reassignments/mytopic/
> > > > > > > >>> > > > > > > > >>> > 42
> > > > > > > >>> > > > > > > > >>> > would work, and avoid
> > /admin/reassignments
> > > > > having a
> > > > > > > >>> very
> > > > > > > >>> > > > large
> > > > > > > >>> > > > > > > number
> > > > > > > >>> > > > > > > > >>> of
> > > > > > > >>> > > > > > > > >>> > child nodes. On the other hand it also
> > > means
> > > > I
> > > > > have
> > > > > > > >>> to
> > > > > > > >>> > > create
> > > > > > > >>> > > > > and
> > > > > > > >>> > > > > > > > >>> delete
> > > > > > > >>> > > > > > > > >>> > the topic nodes (e.g.
> > > > > /admin/reassignments/mytopic),
> > > > > > > >>> > which
> > > > > > > >>> > > > > incurs
> > > > > > > >>> > > > > > > the
> > > > > > > >>> > > > > > > > >>> cost
> > > > > > > >>> > > > > > > > >>> > of extra round trips to zookeeper. I
> > > suppose
> > > > > that
> > > > > > > >>> since
> > > > > > > >>> > > > > > > reassignment
> > > > > > > >>> > > > > > > > is
> > > > > > > >>> > > > > > > > >>> > generally a slow process it makes
> little
> > > > > difference
> > > > > > > >>> if we
> > > > > > > >>> > > > > > increase
> > > > > > > >>> > > > > > > > the
> > > > > > > >>> > > > > > > > >>> > latency of the interactions with
> > zookeeper.
> > > > > > > >>> > > > > > > > >>> >
> > > > > > > >>> > > > > > > > >>> > I have updated the KIP with these
> > > > > improvements, and a
> > > > > > > >>> > more
> > > > > > > >>> > > > > > detailed
> > > > > > > >>> > > > > > > > >>> > description of exactly how we would
> > manage
> > > > > these
> > > > > > > >>> znodes.
> > > > > > > >>> > > > > > > > >>> >
> > > > > > > >>> > > > > > > > >>> > Reading the algorithm in
> KafkaController.
> > > > > > > >>> > > > > > > onPartitionReassignment(),
> > > > > > > >>> > > > > > > > it
> > > > > > > >>> > > > > > > > >>> > seems that it would be suboptimal for
> > > > changing
> > > > > > > >>> > > reassignments
> > > > > > > >>> > > > > > > > in-flight.
> > > > > > > >>> > > > > > > > >>> > Consider an initial assignment of
> [1,2],
> > > > > reassigned
> > > > > > > >>> to
> > > > > > > >>> > > [2,3]
> > > > > > > >>> > > > > and
> > > > > > > >>> > > > > > > then
> > > > > > > >>> > > > > > > > >>> > changed to [2,4]. Broker 3 will remain
> in
> > > the
> > > > > > > >>> assigned
> > > > > > > >>> > > > replicas
> > > > > > > >>> > > > > > > until
> > > > > > > >>> > > > > > > > >>> > broker 4 is in sync, even though 3
> wasn't
> > > > > actually
> > > > > > > >>> one of
> > > > > > > >>> > > the
> > > > > > > >>> > > > > > > > original
> > > > > > > >>> > > > > > > > >>> > assigned replicas and is no longer a
> new
> > > > > assigned
> > > > > > > >>> > replica.
> > > > > > > >>> > > I
> > > > > > > >>> > > > > > think
> > > > > > > >>> > > > > > > > this
> > > > > > > >>> > > > > > > > >>> > also affects the case where the
> > > reassignment
> > > > is
> > > > > > > >>> cancelled
> > > > > > > >>> > > > > > > > >>> > ([1,2]->[2,3]->[1,2]): We again have to
> > > wait
> > > > > for 3 to
> > > > > > > >>> > catch
> > > > > > > >>> > > > up,
> > > > > > > >>> > > > > > > even
> > > > > > > >>> > > > > > > > >>> though
> > > > > > > >>> > > > > > > > >>> > its replica will then be deleted.
> > > > > > > >>> > > > > > > > >>> >
> > > > > > > >>> > > > > > > > >>> > Should we seek to improve this
> algorithm
> > in
> > > > > this
> > > > > > > >>> KIP, or
> > > > > > > >>> > > > leave
> > > > > > > >>> > > > > > that
> > > > > > > >>> > > > > > > > as
> > > > > > > >>> > > > > > > > >>> a
> > > > > > > >>> > > > > > > > >>> > later optimisation?
> > > > > > > >>> > > > > > > > >>> >
> > > > > > > >>> > > > > > > > >>> > Cheers,
> > > > > > > >>> > > > > > > > >>> >
> > > > > > > >>> > > > > > > > >>> > Tom
> > > > > > > >>> > > > > > > > >>> >
> > > > > > > >>> > > > > > > > >>> > On 11 December 2017 at 21:31, Jun Rao <
> > > > > > > >>> jun@confluent.io>
> > > > > > > >>> > > > > wrote:
> > > > > > > >>> > > > > > > > >>> >
> > > > > > > >>> > > > > > > > >>> > > Another question is on the
> > compatibility.
> > > > > Since now
> > > > > > > >>> > there
> > > > > > > >>> > > > > are 2
> > > > > > > >>> > > > > > > > ways
> > > > > > > >>> > > > > > > > >>> of
> > > > > > > >>> > > > > > > > >>> > > specifying a partition reassignment,
> > one
> > > > > under
> > > > > > > >>> > > > > > > > >>> /admin/reassign_partitions
> > > > > > > >>> > > > > > > > >>> > > and the other under
> > /admin/reassignments,
> > > > we
> > > > > > > >>> probably
> > > > > > > >>> > > want
> > > > > > > >>> > > > to
> > > > > > > >>> > > > > > > > >>> prevent the
> > > > > > > >>> > > > > > > > >>> > > same topic being reassigned under
> both
> > > > paths
> > > > > at the
> > > > > > > >>> > same
> > > > > > > >>> > > > > time?
> > > > > > > >>> > > > > > > > >>> > > Thanks,
> > > > > > > >>> > > > > > > > >>> > >
> > > > > > > >>> > > > > > > > >>> > > Jun
> > > > > > > >>> > > > > > > > >>> > >
> > > > > > > >>> > > > > > > > >>> > >
> > > > > > > >>> > > > > > > > >>> > >
> > > > > > > >>> > > > > > > > >>> > > On Fri, Dec 8, 2017 at 5:41 PM, Jun
> > Rao <
> > > > > > > >>> > > jun@confluent.io>
> > > > > > > >>> > > > > > > wrote:
> > > > > > > >>> > > > > > > > >>> > >
> > > > > > > >>> > > > > > > > >>> > > > Hi, Tom,
> > > > > > > >>> > > > > > > > >>> > > >
> > > > > > > >>> > > > > > > > >>> > > > Thanks for the KIP. It definitely
> > > > > addresses one
> > > > > > > >>> of
> > > > > > > >>> > the
> > > > > > > >>> > > > pain
> > > > > > > >>> > > > > > > > points
> > > > > > > >>> > > > > > > > >>> in
> > > > > > > >>> > > > > > > > >>> > > > partition reassignment. Another
> issue
> > > > that
> > > > > it
> > > > > > > >>> also
> > > > > > > >>> > > > > addresses
> > > > > > > >>> > > > > > is
> > > > > > > >>> > > > > > > > >>> the ZK
> > > > > > > >>> > > > > > > > >>> > > node
> > > > > > > >>> > > > > > > > >>> > > > size limit when writing the
> > > reassignment
> > > > > JSON.
> > > > > > > >>> > > > > > > > >>> > > >
> > > > > > > >>> > > > > > > > >>> > > > My only concern is that the KIP
> needs
> > > to
> > > > > create
> > > > > > > >>> one
> > > > > > > >>> > > > watcher
> > > > > > > >>> > > > > > per
> > > > > > > >>> > > > > > > > >>> > > reassigned
> > > > > > > >>> > > > > > > > >>> > > > partition. This could add overhead
> in
> > > ZK
> > > > > and
> > > > > > > >>> > complexity
> > > > > > > >>> > > > for
> > > > > > > >>> > > > > > > > >>> debugging
> > > > > > > >>> > > > > > > > >>> > > when
> > > > > > > >>> > > > > > > > >>> > > > lots of partitions are being
> > reassigned
> > > > > > > >>> > simultaneously.
> > > > > > > >>> > > > We
> > > > > > > >>> > > > > > > could
> > > > > > > >>> > > > > > > > >>> > > > potentially improve this by
> > > introducing a
> > > > > > > >>> separate ZK
> > > > > > > >>> > > > path
> > > > > > > >>> > > > > > for
> > > > > > > >>> > > > > > > > >>> change
> > > > > > > >>> > > > > > > > >>> > > > notification as we do for configs.
> > For
> > > > > example,
> > > > > > > >>> every
> > > > > > > >>> > > > time
> > > > > > > >>> > > > > we
> > > > > > > >>> > > > > > > > >>> change
> > > > > > > >>> > > > > > > > >>> > the
> > > > > > > >>> > > > > > > > >>> > > > assignment for a set of partitions,
> > we
> > > > > could
> > > > > > > >>> further
> > > > > > > >>> > > > write
> > > > > > > >>> > > > > a
> > > > > > > >>> > > > > > > > >>> sequential
> > > > > > > >>> > > > > > > > >>> > > > node
> > > > > /admin/reassignment_changes/[change_x]. That
> > > > > > > >>> > way,
> > > > > > > >>> > > > the
> > > > > > > >>> > > > > > > > >>> controller
> > > > > > > >>> > > > > > > > >>> > > > only needs to watch the change
> path.
> > > > Once a
> > > > > > > >>> change is
> > > > > > > >>> > > > > > > triggered,
> > > > > > > >>> > > > > > > > >>> the
> > > > > > > >>> > > > > > > > >>> > > > controller can read everything
> under
> > > > > > > >>> > > > /admin/reassignments/.
> > > > > > > >>> > > > > > > > >>> > > >
> > > > > > > >>> > > > > > > > >>> > > > Jun
> > > > > > > >>> > > > > > > > >>> > > >
> > > > > > > >>> > > > > > > > >>> > > >
> > > > > > > >>> > > > > > > > >>> > > > On Wed, Dec 6, 2017 at 1:19 PM, Tom
> > > > > Bentley <
> > > > > > > >>> > > > > > > > t.j.bentley@gmail.com
> > > > > > > >>> > > > > > > > >>> >
> > > > > > > >>> > > > > > > > >>> > > wrote:
> > > > > > > >>> > > > > > > > >>> > > >
> > > > > > > >>> > > > > > > > >>> > > >> Hi,
> > > > > > > >>> > > > > > > > >>> > > >>
> > > > > > > >>> > > > > > > > >>> > > >> This is still very new, but I
> wanted
> > > > some
> > > > > quick
> > > > > > > >>> > > feedback
> > > > > > > >>> > > > > on
> > > > > > > >>> > > > > > a
> > > > > > > >>> > > > > > > > >>> > > preliminary
> > > > > > > >>> > > > > > > > >>> > > >> KIP which could, I think, help
> with
> > > > > providing an
> > > > > > > >>> > > > > AdminClient
> > > > > > > >>> > > > > > > API
> > > > > > > >>> > > > > > > > >>> for
> > > > > > > >>> > > > > > > > >>> > > >> partition reassignment.
> > > > > > > >>> > > > > > > > >>> > > >>
> > > > > > > >>> > > > > > > > >>> > > >> https://cwiki.apache.org/
> > > > > > > >>> > > confluence/display/KAFKA/KIP-
> > > > > > > >>> > > > > 236%
> > > > > > > >>> > > > > > > > >>



-- 
Best,
Stanislav