You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Deepak Raghav <de...@gmail.com> on 2020/06/09 09:07:55 UTC

Re: Kafka Connect Connector Tasks Uneven Division

Hi Robin

Thanks for your reply and accept my apology for the delayed response.

As you suggested that we should have a separate worker cluster based on
workload pattern. But as you said, task allocation is nondeterministic, so
same things can happen in the new cluster.

Please let me know if my understanding is correct or not.

Regards and Thanks
Deepak Raghav



On Tue, May 26, 2020 at 8:20 PM Robin Moffatt <ro...@confluent.io> wrote:

> The KIP for the current rebalancing protocol is probably a good reference:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415:+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
>
>
> On Tue, 26 May 2020 at 14:25, Deepak Raghav <de...@gmail.com>
> wrote:
>
> > Hi Robin
> >
> > Thanks for the clarification.
> >
> > As you suggested, that task allocation between the workers is
> > nondeterministic. I have shared the same information within in my team
> but
> > there are some other parties, with whom I need to share this information
> as
> > explanation for the issue raised by them and I cannot show this mail as a
> > reference.
> >
> > It would be very great if you please share any link/discussion reference
> > regarding the same.
> >
> > Regards and Thanks
> > Deepak Raghav
> >
> >
> >
> > On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io>
> wrote:
> >
> > > I don't think you're right to assert that this is "expected behaviour":
> > >
> > > >  the tasks are divided in below pattern when they are first time
> > > registered
> > >
> > > Kafka Connect task allocation is non-determanistic.
> > >
> > > I'm still not clear if you're solving for a theoretical problem or an
> > > actual one. If this is an actual problem that you're encountering and
> > need
> > > a solution to then since the task allocation is not deterministic it
> > sounds
> > > like you need to deploy separate worker clusters based on the workload
> > > patterns that you are seeing and machine resources available.
> > >
> > >
> > > --
> > >
> > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> @rmoff
> > >
> > >
> > > On Wed, 20 May 2020 at 21:29, Deepak Raghav <de...@gmail.com>
> > > wrote:
> > >
> > > > Hi Robin
> > > >
> > > > I had gone though the link you provided, It is not helpful in my
> case.
> > > > Apart from this, *I am not getting why the tasks are divided in
> *below
> > > > pattern* when they are *first time registered*, which is expected
> > > behavior.
> > > > I*s there any parameter which we can pass in worker property file
> which
> > > > handle the task assignment strategy like we have range assigner or
> > round
> > > > robin in consumer-group ?
> > > >
> > > > connector rest status api result after first registration :
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.5:*8080*"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:*8078*"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.5:*8080*"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > > and
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.4:*8078*"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:*8078*"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.5:*8080*"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > >
> > > > But when I stop the second worker process and wait for
> > > > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start
> the
> > > > process again. Result is different.
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.5:*8080*"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.5:*8080*"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.5:*8080*"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > > and
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.4:*8078*"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:*8078*"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:*8078*"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > > Regards and Thanks
> > > > Deepak Raghav
> > > >
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io>
> > > wrote:
> > > >
> > > > > Thanks for the clarification. If this is an actual problem that
> > you're
> > > > > encountering and need a solution to then since the task allocation
> is
> > > not
> > > > > deterministic it sounds like you need to deploy separate worker
> > > clusters
> > > > > based on the workload patterns that you are seeing and machine
> > > resources
> > > > > available.
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> > > @rmoff
> > > > >
> > > > >
> > > > > On Wed, 20 May 2020 at 16:39, Deepak Raghav <
> > deepakraghav86@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Robin
> > > > > >
> > > > > > Replying to your query i.e
> > > > > >
> > > > > > One thing I'd ask at this point is though if it makes any
> > difference
> > > > > where
> > > > > > the tasks execute?
> > > > > >
> > > > > > It actually makes difference to us, we have 16 connectors and as
> I
> > > > stated
> > > > > > tasks division earlier, first 8 connector' task are assigned to
> > first
> > > > > > worker process and another connector's task to another worker
> > process
> > > > and
> > > > > > just to mention that these 16 connectors are sink connectors.
> Each
> > > sink
> > > > > > connector consumes message from different topic.There may be a
> case
> > > > when
> > > > > > messages are coming only for first 8 connector's topic and
> because
> > > all
> > > > > the
> > > > > > tasks of these connectors are assigned to First Worker, load
> would
> > be
> > > > > high
> > > > > > on it and another set of connectors in another worker would be
> > idle.
> > > > > >
> > > > > > Instead, if the task would have been divided evenly then this
> case
> > > > would
> > > > > > have been avoided. Because tasks of each connector would be
> present
> > > in
> > > > > both
> > > > > > workers process like below :
> > > > > >
> > > > > > *W1*                       *W2*
> > > > > >  C1T1                    C1T2
> > > > > >  C2T2                    C2T2
> > > > > >
> > > > > > I hope, I gave your answer,
> > > > > >
> > > > > >
> > > > > > Regards and Thanks
> > > > > > Deepak Raghav
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <
> robin@confluent.io>
> > > > > wrote:
> > > > > >
> > > > > > > OK, I understand better now.
> > > > > > >
> > > > > > > You can read more about the guts of the rebalancing protocol
> that
> > > > Kafka
> > > > > > > Connect uses as of Apache Kafka 2.3 an onwards here:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> > > > > > >
> > > > > > > One thing I'd ask at this point is though if it makes any
> > > difference
> > > > > > where
> > > > > > > the tasks execute? The point of a cluster is that Kafka Connect
> > > > manages
> > > > > > the
> > > > > > > workload allocation. If you need workload separation and
> > > > > > > guaranteed execution locality I would suggest separate Kafka
> > > Connect
> > > > > > > distributed clusters.
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io
> |
> > > > > @rmoff
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
> > > > deepakraghav86@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Robin
> > > > > > > >
> > > > > > > > Thanks for your reply.
> > > > > > > >
> > > > > > > > We are having two worker on different IP. The example which I
> > > gave
> > > > > you
> > > > > > it
> > > > > > > > was just a example. We are using kafka version 2.3.1.
> > > > > > > >
> > > > > > > > Let me tell you again with a simple example.
> > > > > > > >
> > > > > > > > Suppose, we have two EC2 node, N1 and N2 having worker
> process
> > W1
> > > > and
> > > > > > W2
> > > > > > > > running in distribute mode with groupId i.e in same cluster
> and
> > > two
> > > > > > > > connectors with having two task each i.e
> > > > > > > >
> > > > > > > > Node N1: W1 is running
> > > > > > > > Node N2 : W2 is running
> > > > > > > >
> > > > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with
> id
> > :
> > > > C1T2
> > > > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with
> > id :
> > > > > C2T2
> > > > > > > >
> > > > > > > > Now Suppose If both W1 and W2 worker process are running
> and I
> > > > > > register
> > > > > > > > Connector C1 and C2 one after another i.e sequentially, on
> any
> > of
> > > > the
> > > > > > > > worker process, the tasks division between the worker
> > > > > > > > node are happening like below, which is expected.
> > > > > > > >
> > > > > > > > *W1*                       *W2*
> > > > > > > > C1T1                    C1T2
> > > > > > > > C2T2                    C2T2
> > > > > > > >
> > > > > > > > Now, suppose I stop one worker process e.g W2 and start after
> > > some
> > > > > > time,
> > > > > > > > the tasks division is changed like below i.e first
> connector's
> > > task
> > > > > > move
> > > > > > > to
> > > > > > > > W1 and second connector's task move to W2
> > > > > > > >
> > > > > > > > *W1*                       *W2*
> > > > > > > > C1T1                    C2T1
> > > > > > > > C1T2                    C2T2
> > > > > > > >
> > > > > > > >
> > > > > > > > Please let me know, If it is understandable or not.
> > > > > > > >
> > > > > > > > Note : Actually, In production, we are gonna have 16
> connectors
> > > > > having
> > > > > > 10
> > > > > > > > task each and two worker node. With above scenario, first 8
> > > > > > connectors's
> > > > > > > > task move to W1 and next 8 connector' task move to W2, Which
> is
> > > not
> > > > > > > > expected.
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards and Thanks
> > > > > > > > Deepak Raghav
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <
> > > robin@confluent.io>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > So you're running two workers on the same machine
> (10.0.0.4),
> > > is
> > > > > > > > > that correct? Normally you'd run one worker per machine
> > unless
> > > > > there
> > > > > > > was
> > > > > > > > a
> > > > > > > > > particular reason otherwise.
> > > > > > > > > What version of Apache Kafka are you using?
> > > > > > > > > I'm not clear from your question if the distribution of
> tasks
> > > is
> > > > > > > > > presenting a problem to you (if so please describe why), or
> > if
> > > > > you're
> > > > > > > > just
> > > > > > > > > interested in the theory behind the rebalancing protocol?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Robin Moffatt | Senior Developer Advocate |
> > robin@confluent.io
> > > |
> > > > > > > @rmoff
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> > > > > > deepakraghav86@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi
> > > > > > > > > >
> > > > > > > > > > Please, can anybody help me with this?
> > > > > > > > > >
> > > > > > > > > > Regards and Thanks
> > > > > > > > > > Deepak Raghav
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > > > > > > deepakraghav86@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Team
> > > > > > > > > > >
> > > > > > > > > > > We have two worker node in a cluster and 2 connector
> with
> > > > > having
> > > > > > 10
> > > > > > > > > tasks
> > > > > > > > > > > each.
> > > > > > > > > > >
> > > > > > > > > > > Now, suppose if we have two kafka connect process
> W1(Port
> > > > 8080)
> > > > > > and
> > > > > > > > > > > W2(Port 8078) started already in distribute mode and
> then
> > > > > > register
> > > > > > > > the
> > > > > > > > > > > connectors, task of one connector i.e 10 tasks are
> > divided
> > > > > > equally
> > > > > > > > > > between
> > > > > > > > > > > two worker i.e first task of A connector to W1 worker
> > node
> > > > and
> > > > > > sec
> > > > > > > > task
> > > > > > > > > > of
> > > > > > > > > > > A connector to W2 worker node, similarly for first task
> > of
> > > B
> > > > > > > > connector,
> > > > > > > > > > > will go to W1 node and sec task of B connector go to W2
> > > node.
> > > > > > > > > > >
> > > > > > > > > > > e.g
> > > > > > > > > > > *#First Connector : *
> > > > > > > > > > > {
> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > > > > >   "connector": {
> > > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > > > > > > >   },
> > > > > > > > > > >   "tasks": [
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 0,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 1,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 2,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 3,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 4,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 5,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 6,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 7,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 8,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 9,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     }
> > > > > > > > > > >   ],
> > > > > > > > > > >   "type": "sink"
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > *#Sec connector*
> > > > > > > > > > >
> > > > > > > > > > > {
> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > > > > >   "connector": {
> > > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >   },
> > > > > > > > > > >   "tasks": [
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 0,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 1,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 2,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 3,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 4,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 5,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 6,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 7,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 8,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 9,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     }
> > > > > > > > > > >   ],
> > > > > > > > > > >   "type": "sink"
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > > But I have seen a strange behavior, when I just
> shutdown
> > W2
> > > > > > worker
> > > > > > > > node
> > > > > > > > > > > and start it again task are divided but in diff way i.e
> > all
> > > > the
> > > > > > > tasks
> > > > > > > > > of
> > > > > > > > > > A
> > > > > > > > > > > connector will get into W1 node and tasks of B
> Connector
> > > into
> > > > > W2
> > > > > > > > node.
> > > > > > > > > > >
> > > > > > > > > > > Can you please have a look for this.
> > > > > > > > > > >
> > > > > > > > > > > *#First Connector*
> > > > > > > > > > >
> > > > > > > > > > > {
> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > > > > >   "connector": {
> > > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >   },
> > > > > > > > > > >   "tasks": [
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 0,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 1,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 2,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 3,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 4,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 5,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 6,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 7,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 8,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 9,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     }
> > > > > > > > > > >   ],
> > > > > > > > > > >   "type": "sink"
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > > *#Second Connector *:
> > > > > > > > > > >
> > > > > > > > > > > {
> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > > > > >   "connector": {
> > > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >   },
> > > > > > > > > > >   "tasks": [
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 0,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 1,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 2,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 3,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 4,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 5,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 6,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 7,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 8,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 9,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     }
> > > > > > > > > > >   ],
> > > > > > > > > > >   "type": "sink"
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Regards and Thanks
> > > > > > > > > > > Deepak Raghav
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Deepak Raghav <de...@gmail.com>.
Hi Robin

Request you to please reply.

Regards and Thanks
Deepak Raghav



On Wed, Jun 10, 2020 at 11:57 AM Deepak Raghav <de...@gmail.com>
wrote:

> Hi  Robin
>
> Can you please reply.
>
> I just want to add one more thing, that yesterday I tried with
> connect.protocal=eager. Task distribution was balanced after that.
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Tue, Jun 9, 2020 at 2:37 PM Deepak Raghav <de...@gmail.com>
> wrote:
>
>> Hi Robin
>>
>> Thanks for your reply and accept my apology for the delayed response.
>>
>> As you suggested that we should have a separate worker cluster based on
>> workload pattern. But as you said, task allocation is nondeterministic, so
>> same things can happen in the new cluster.
>>
>> Please let me know if my understanding is correct or not.
>>
>> Regards and Thanks
>> Deepak Raghav
>>
>>
>>
>> On Tue, May 26, 2020 at 8:20 PM Robin Moffatt <ro...@confluent.io> wrote:
>>
>>> The KIP for the current rebalancing protocol is probably a good
>>> reference:
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415:+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
>>>
>>>
>>> --
>>>
>>> Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
>>>
>>>
>>> On Tue, 26 May 2020 at 14:25, Deepak Raghav <de...@gmail.com>
>>> wrote:
>>>
>>> > Hi Robin
>>> >
>>> > Thanks for the clarification.
>>> >
>>> > As you suggested, that task allocation between the workers is
>>> > nondeterministic. I have shared the same information within in my team
>>> but
>>> > there are some other parties, with whom I need to share this
>>> information as
>>> > explanation for the issue raised by them and I cannot show this mail
>>> as a
>>> > reference.
>>> >
>>> > It would be very great if you please share any link/discussion
>>> reference
>>> > regarding the same.
>>> >
>>> > Regards and Thanks
>>> > Deepak Raghav
>>> >
>>> >
>>> >
>>> > On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io>
>>> wrote:
>>> >
>>> > > I don't think you're right to assert that this is "expected
>>> behaviour":
>>> > >
>>> > > >  the tasks are divided in below pattern when they are first time
>>> > > registered
>>> > >
>>> > > Kafka Connect task allocation is non-determanistic.
>>> > >
>>> > > I'm still not clear if you're solving for a theoretical problem or an
>>> > > actual one. If this is an actual problem that you're encountering and
>>> > need
>>> > > a solution to then since the task allocation is not deterministic it
>>> > sounds
>>> > > like you need to deploy separate worker clusters based on the
>>> workload
>>> > > patterns that you are seeing and machine resources available.
>>> > >
>>> > >
>>> > > --
>>> > >
>>> > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
>>> @rmoff
>>> > >
>>> > >
>>> > > On Wed, 20 May 2020 at 21:29, Deepak Raghav <
>>> deepakraghav86@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi Robin
>>> > > >
>>> > > > I had gone though the link you provided, It is not helpful in my
>>> case.
>>> > > > Apart from this, *I am not getting why the tasks are divided in
>>> *below
>>> > > > pattern* when they are *first time registered*, which is expected
>>> > > behavior.
>>> > > > I*s there any parameter which we can pass in worker property file
>>> which
>>> > > > handle the task assignment strategy like we have range assigner or
>>> > round
>>> > > > robin in consumer-group ?
>>> > > >
>>> > > > connector rest status api result after first registration :
>>> > > >
>>> > > > {
>>> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>>> > > >   "connector": {
>>> > > >     "state": "RUNNING",
>>> > > >     "worker_id": "10.0.0.5:*8080*"
>>> > > >   },
>>> > > >   "tasks": [
>>> > > >     {
>>> > > >       "id": 0,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.4:*8078*"
>>> > > >     },
>>> > > >     {
>>> > > >       "id": 1,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.5:*8080*"
>>> > > >     }
>>> > > >   ],
>>> > > >   "type": "sink"
>>> > > > }
>>> > > >
>>> > > > and
>>> > > >
>>> > > > {
>>> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>>> > > >   "connector": {
>>> > > >     "state": "RUNNING",
>>> > > >     "worker_id": "10.0.0.4:*8078*"
>>> > > >   },
>>> > > >   "tasks": [
>>> > > >     {
>>> > > >       "id": 0,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.4:*8078*"
>>> > > >     },
>>> > > >     {
>>> > > >       "id": 1,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.5:*8080*"
>>> > > >     }
>>> > > >   ],
>>> > > >   "type": "sink"
>>> > > > }
>>> > > >
>>> > > >
>>> > > > But when I stop the second worker process and wait for
>>> > > > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and
>>> start the
>>> > > > process again. Result is different.
>>> > > >
>>> > > > {
>>> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>>> > > >   "connector": {
>>> > > >     "state": "RUNNING",
>>> > > >     "worker_id": "10.0.0.5:*8080*"
>>> > > >   },
>>> > > >   "tasks": [
>>> > > >     {
>>> > > >       "id": 0,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.5:*8080*"
>>> > > >     },
>>> > > >     {
>>> > > >       "id": 1,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.5:*8080*"
>>> > > >     }
>>> > > >   ],
>>> > > >   "type": "sink"
>>> > > > }
>>> > > >
>>> > > > and
>>> > > >
>>> > > > {
>>> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>>> > > >   "connector": {
>>> > > >     "state": "RUNNING",
>>> > > >     "worker_id": "10.0.0.4:*8078*"
>>> > > >   },
>>> > > >   "tasks": [
>>> > > >     {
>>> > > >       "id": 0,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.4:*8078*"
>>> > > >     },
>>> > > >     {
>>> > > >       "id": 1,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.4:*8078*"
>>> > > >     }
>>> > > >   ],
>>> > > >   "type": "sink"
>>> > > > }
>>> > > >
>>> > > > Regards and Thanks
>>> > > > Deepak Raghav
>>> > > >
>>> > > >
>>> > > >
>>> > > > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io>
>>> > > wrote:
>>> > > >
>>> > > > > Thanks for the clarification. If this is an actual problem that
>>> > you're
>>> > > > > encountering and need a solution to then since the task
>>> allocation is
>>> > > not
>>> > > > > deterministic it sounds like you need to deploy separate worker
>>> > > clusters
>>> > > > > based on the workload patterns that you are seeing and machine
>>> > > resources
>>> > > > > available.
>>> > > > >
>>> > > > >
>>> > > > > --
>>> > > > >
>>> > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
>>> > > @rmoff
>>> > > > >
>>> > > > >
>>> > > > > On Wed, 20 May 2020 at 16:39, Deepak Raghav <
>>> > deepakraghav86@gmail.com>
>>> > > > > wrote:
>>> > > > >
>>> > > > > > Hi Robin
>>> > > > > >
>>> > > > > > Replying to your query i.e
>>> > > > > >
>>> > > > > > One thing I'd ask at this point is though if it makes any
>>> > difference
>>> > > > > where
>>> > > > > > the tasks execute?
>>> > > > > >
>>> > > > > > It actually makes difference to us, we have 16 connectors and
>>> as I
>>> > > > stated
>>> > > > > > tasks division earlier, first 8 connector' task are assigned to
>>> > first
>>> > > > > > worker process and another connector's task to another worker
>>> > process
>>> > > > and
>>> > > > > > just to mention that these 16 connectors are sink connectors.
>>> Each
>>> > > sink
>>> > > > > > connector consumes message from different topic.There may be a
>>> case
>>> > > > when
>>> > > > > > messages are coming only for first 8 connector's topic and
>>> because
>>> > > all
>>> > > > > the
>>> > > > > > tasks of these connectors are assigned to First Worker, load
>>> would
>>> > be
>>> > > > > high
>>> > > > > > on it and another set of connectors in another worker would be
>>> > idle.
>>> > > > > >
>>> > > > > > Instead, if the task would have been divided evenly then this
>>> case
>>> > > > would
>>> > > > > > have been avoided. Because tasks of each connector would be
>>> present
>>> > > in
>>> > > > > both
>>> > > > > > workers process like below :
>>> > > > > >
>>> > > > > > *W1*                       *W2*
>>> > > > > >  C1T1                    C1T2
>>> > > > > >  C2T2                    C2T2
>>> > > > > >
>>> > > > > > I hope, I gave your answer,
>>> > > > > >
>>> > > > > >
>>> > > > > > Regards and Thanks
>>> > > > > > Deepak Raghav
>>> > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <
>>> robin@confluent.io>
>>> > > > > wrote:
>>> > > > > >
>>> > > > > > > OK, I understand better now.
>>> > > > > > >
>>> > > > > > > You can read more about the guts of the rebalancing protocol
>>> that
>>> > > > Kafka
>>> > > > > > > Connect uses as of Apache Kafka 2.3 an onwards here:
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
>>> > > > > > >
>>> > > > > > > One thing I'd ask at this point is though if it makes any
>>> > > difference
>>> > > > > > where
>>> > > > > > > the tasks execute? The point of a cluster is that Kafka
>>> Connect
>>> > > > manages
>>> > > > > > the
>>> > > > > > > workload allocation. If you need workload separation and
>>> > > > > > > guaranteed execution locality I would suggest separate Kafka
>>> > > Connect
>>> > > > > > > distributed clusters.
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > --
>>> > > > > > >
>>> > > > > > > Robin Moffatt | Senior Developer Advocate |
>>> robin@confluent.io |
>>> > > > > @rmoff
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
>>> > > > deepakraghav86@gmail.com>
>>> > > > > > > wrote:
>>> > > > > > >
>>> > > > > > > > Hi Robin
>>> > > > > > > >
>>> > > > > > > > Thanks for your reply.
>>> > > > > > > >
>>> > > > > > > > We are having two worker on different IP. The example
>>> which I
>>> > > gave
>>> > > > > you
>>> > > > > > it
>>> > > > > > > > was just a example. We are using kafka version 2.3.1.
>>> > > > > > > >
>>> > > > > > > > Let me tell you again with a simple example.
>>> > > > > > > >
>>> > > > > > > > Suppose, we have two EC2 node, N1 and N2 having worker
>>> process
>>> > W1
>>> > > > and
>>> > > > > > W2
>>> > > > > > > > running in distribute mode with groupId i.e in same
>>> cluster and
>>> > > two
>>> > > > > > > > connectors with having two task each i.e
>>> > > > > > > >
>>> > > > > > > > Node N1: W1 is running
>>> > > > > > > > Node N2 : W2 is running
>>> > > > > > > >
>>> > > > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2
>>> with id
>>> > :
>>> > > > C1T2
>>> > > > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2
>>> with
>>> > id :
>>> > > > > C2T2
>>> > > > > > > >
>>> > > > > > > > Now Suppose If both W1 and W2 worker process are running
>>> and I
>>> > > > > > register
>>> > > > > > > > Connector C1 and C2 one after another i.e sequentially, on
>>> any
>>> > of
>>> > > > the
>>> > > > > > > > worker process, the tasks division between the worker
>>> > > > > > > > node are happening like below, which is expected.
>>> > > > > > > >
>>> > > > > > > > *W1*                       *W2*
>>> > > > > > > > C1T1                    C1T2
>>> > > > > > > > C2T2                    C2T2
>>> > > > > > > >
>>> > > > > > > > Now, suppose I stop one worker process e.g W2 and start
>>> after
>>> > > some
>>> > > > > > time,
>>> > > > > > > > the tasks division is changed like below i.e first
>>> connector's
>>> > > task
>>> > > > > > move
>>> > > > > > > to
>>> > > > > > > > W1 and second connector's task move to W2
>>> > > > > > > >
>>> > > > > > > > *W1*                       *W2*
>>> > > > > > > > C1T1                    C2T1
>>> > > > > > > > C1T2                    C2T2
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > Please let me know, If it is understandable or not.
>>> > > > > > > >
>>> > > > > > > > Note : Actually, In production, we are gonna have 16
>>> connectors
>>> > > > > having
>>> > > > > > 10
>>> > > > > > > > task each and two worker node. With above scenario, first 8
>>> > > > > > connectors's
>>> > > > > > > > task move to W1 and next 8 connector' task move to W2,
>>> Which is
>>> > > not
>>> > > > > > > > expected.
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > Regards and Thanks
>>> > > > > > > > Deepak Raghav
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <
>>> > > robin@confluent.io>
>>> > > > > > > wrote:
>>> > > > > > > >
>>> > > > > > > > > So you're running two workers on the same machine
>>> (10.0.0.4),
>>> > > is
>>> > > > > > > > > that correct? Normally you'd run one worker per machine
>>> > unless
>>> > > > > there
>>> > > > > > > was
>>> > > > > > > > a
>>> > > > > > > > > particular reason otherwise.
>>> > > > > > > > > What version of Apache Kafka are you using?
>>> > > > > > > > > I'm not clear from your question if the distribution of
>>> tasks
>>> > > is
>>> > > > > > > > > presenting a problem to you (if so please describe why),
>>> or
>>> > if
>>> > > > > you're
>>> > > > > > > > just
>>> > > > > > > > > interested in the theory behind the rebalancing protocol?
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > > > --
>>> > > > > > > > >
>>> > > > > > > > > Robin Moffatt | Senior Developer Advocate |
>>> > robin@confluent.io
>>> > > |
>>> > > > > > > @rmoff
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
>>> > > > > > deepakraghav86@gmail.com>
>>> > > > > > > > > wrote:
>>> > > > > > > > >
>>> > > > > > > > > > Hi
>>> > > > > > > > > >
>>> > > > > > > > > > Please, can anybody help me with this?
>>> > > > > > > > > >
>>> > > > > > > > > > Regards and Thanks
>>> > > > > > > > > > Deepak Raghav
>>> > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
>>> > > > > > > > deepakraghav86@gmail.com>
>>> > > > > > > > > > wrote:
>>> > > > > > > > > >
>>> > > > > > > > > > > Hi Team
>>> > > > > > > > > > >
>>> > > > > > > > > > > We have two worker node in a cluster and 2 connector
>>> with
>>> > > > > having
>>> > > > > > 10
>>> > > > > > > > > tasks
>>> > > > > > > > > > > each.
>>> > > > > > > > > > >
>>> > > > > > > > > > > Now, suppose if we have two kafka connect process
>>> W1(Port
>>> > > > 8080)
>>> > > > > > and
>>> > > > > > > > > > > W2(Port 8078) started already in distribute mode and
>>> then
>>> > > > > > register
>>> > > > > > > > the
>>> > > > > > > > > > > connectors, task of one connector i.e 10 tasks are
>>> > divided
>>> > > > > > equally
>>> > > > > > > > > > between
>>> > > > > > > > > > > two worker i.e first task of A connector to W1 worker
>>> > node
>>> > > > and
>>> > > > > > sec
>>> > > > > > > > task
>>> > > > > > > > > > of
>>> > > > > > > > > > > A connector to W2 worker node, similarly for first
>>> task
>>> > of
>>> > > B
>>> > > > > > > > connector,
>>> > > > > > > > > > > will go to W1 node and sec task of B connector go to
>>> W2
>>> > > node.
>>> > > > > > > > > > >
>>> > > > > > > > > > > e.g
>>> > > > > > > > > > > *#First Connector : *
>>> > > > > > > > > > > {
>>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>>> > > > > > > > > > >   "connector": {
>>> > > > > > > > > > >     "state": "RUNNING",
>>> > > > > > > > > > >     "worker_id": "10.0.0.4:*8080*"
>>> > > > > > > > > > >   },
>>> > > > > > > > > > >   "tasks": [
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 0,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:*8078*"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 1,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 2,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 3,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 4,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 5,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 6,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 7,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 8,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 9,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     }
>>> > > > > > > > > > >   ],
>>> > > > > > > > > > >   "type": "sink"
>>> > > > > > > > > > > }
>>> > > > > > > > > > >
>>> > > > > > > > > > >
>>> > > > > > > > > > > *#Sec connector*
>>> > > > > > > > > > >
>>> > > > > > > > > > > {
>>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>>> > > > > > > > > > >   "connector": {
>>> > > > > > > > > > >     "state": "RUNNING",
>>> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >   },
>>> > > > > > > > > > >   "tasks": [
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 0,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 1,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 2,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 3,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 4,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 5,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 6,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 7,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 8,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 9,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     }
>>> > > > > > > > > > >   ],
>>> > > > > > > > > > >   "type": "sink"
>>> > > > > > > > > > > }
>>> > > > > > > > > > >
>>> > > > > > > > > > > But I have seen a strange behavior, when I just
>>> shutdown
>>> > W2
>>> > > > > > worker
>>> > > > > > > > node
>>> > > > > > > > > > > and start it again task are divided but in diff way
>>> i.e
>>> > all
>>> > > > the
>>> > > > > > > tasks
>>> > > > > > > > > of
>>> > > > > > > > > > A
>>> > > > > > > > > > > connector will get into W1 node and tasks of B
>>> Connector
>>> > > into
>>> > > > > W2
>>> > > > > > > > node.
>>> > > > > > > > > > >
>>> > > > > > > > > > > Can you please have a look for this.
>>> > > > > > > > > > >
>>> > > > > > > > > > > *#First Connector*
>>> > > > > > > > > > >
>>> > > > > > > > > > > {
>>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>>> > > > > > > > > > >   "connector": {
>>> > > > > > > > > > >     "state": "RUNNING",
>>> > > > > > > > > > >     "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >   },
>>> > > > > > > > > > >   "tasks": [
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 0,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 1,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 2,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 3,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 4,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 5,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 6,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 7,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 8,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 9,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     }
>>> > > > > > > > > > >   ],
>>> > > > > > > > > > >   "type": "sink"
>>> > > > > > > > > > > }
>>> > > > > > > > > > >
>>> > > > > > > > > > > *#Second Connector *:
>>> > > > > > > > > > >
>>> > > > > > > > > > > {
>>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>>> > > > > > > > > > >   "connector": {
>>> > > > > > > > > > >     "state": "RUNNING",
>>> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >   },
>>> > > > > > > > > > >   "tasks": [
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 0,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 1,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 2,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 3,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 4,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 5,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 6,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 7,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 8,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 9,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     }
>>> > > > > > > > > > >   ],
>>> > > > > > > > > > >   "type": "sink"
>>> > > > > > > > > > > }
>>> > > > > > > > > > >
>>> > > > > > > > > > >
>>> > > > > > > > > > > Regards and Thanks
>>> > > > > > > > > > > Deepak Raghav
>>> > > > > > > > > > >
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Deepak Raghav <de...@gmail.com>.
Hi  Robin

Can you please reply.

I just want to add one more thing, that yesterday I tried with
connect.protocal=eager. Task distribution was balanced after that.

Regards and Thanks
Deepak Raghav



On Tue, Jun 9, 2020 at 2:37 PM Deepak Raghav <de...@gmail.com>
wrote:

> Hi Robin
>
> Thanks for your reply and accept my apology for the delayed response.
>
> As you suggested that we should have a separate worker cluster based on
> workload pattern. But as you said, task allocation is nondeterministic, so
> same things can happen in the new cluster.
>
> Please let me know if my understanding is correct or not.
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Tue, May 26, 2020 at 8:20 PM Robin Moffatt <ro...@confluent.io> wrote:
>
>> The KIP for the current rebalancing protocol is probably a good reference:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415:+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
>>
>>
>> --
>>
>> Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
>>
>>
>> On Tue, 26 May 2020 at 14:25, Deepak Raghav <de...@gmail.com>
>> wrote:
>>
>> > Hi Robin
>> >
>> > Thanks for the clarification.
>> >
>> > As you suggested, that task allocation between the workers is
>> > nondeterministic. I have shared the same information within in my team
>> but
>> > there are some other parties, with whom I need to share this
>> information as
>> > explanation for the issue raised by them and I cannot show this mail as
>> a
>> > reference.
>> >
>> > It would be very great if you please share any link/discussion reference
>> > regarding the same.
>> >
>> > Regards and Thanks
>> > Deepak Raghav
>> >
>> >
>> >
>> > On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io>
>> wrote:
>> >
>> > > I don't think you're right to assert that this is "expected
>> behaviour":
>> > >
>> > > >  the tasks are divided in below pattern when they are first time
>> > > registered
>> > >
>> > > Kafka Connect task allocation is non-determanistic.
>> > >
>> > > I'm still not clear if you're solving for a theoretical problem or an
>> > > actual one. If this is an actual problem that you're encountering and
>> > need
>> > > a solution to then since the task allocation is not deterministic it
>> > sounds
>> > > like you need to deploy separate worker clusters based on the workload
>> > > patterns that you are seeing and machine resources available.
>> > >
>> > >
>> > > --
>> > >
>> > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
>> @rmoff
>> > >
>> > >
>> > > On Wed, 20 May 2020 at 21:29, Deepak Raghav <deepakraghav86@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > Hi Robin
>> > > >
>> > > > I had gone though the link you provided, It is not helpful in my
>> case.
>> > > > Apart from this, *I am not getting why the tasks are divided in
>> *below
>> > > > pattern* when they are *first time registered*, which is expected
>> > > behavior.
>> > > > I*s there any parameter which we can pass in worker property file
>> which
>> > > > handle the task assignment strategy like we have range assigner or
>> > round
>> > > > robin in consumer-group ?
>> > > >
>> > > > connector rest status api result after first registration :
>> > > >
>> > > > {
>> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>> > > >   "connector": {
>> > > >     "state": "RUNNING",
>> > > >     "worker_id": "10.0.0.5:*8080*"
>> > > >   },
>> > > >   "tasks": [
>> > > >     {
>> > > >       "id": 0,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.4:*8078*"
>> > > >     },
>> > > >     {
>> > > >       "id": 1,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.5:*8080*"
>> > > >     }
>> > > >   ],
>> > > >   "type": "sink"
>> > > > }
>> > > >
>> > > > and
>> > > >
>> > > > {
>> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>> > > >   "connector": {
>> > > >     "state": "RUNNING",
>> > > >     "worker_id": "10.0.0.4:*8078*"
>> > > >   },
>> > > >   "tasks": [
>> > > >     {
>> > > >       "id": 0,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.4:*8078*"
>> > > >     },
>> > > >     {
>> > > >       "id": 1,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.5:*8080*"
>> > > >     }
>> > > >   ],
>> > > >   "type": "sink"
>> > > > }
>> > > >
>> > > >
>> > > > But when I stop the second worker process and wait for
>> > > > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start
>> the
>> > > > process again. Result is different.
>> > > >
>> > > > {
>> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>> > > >   "connector": {
>> > > >     "state": "RUNNING",
>> > > >     "worker_id": "10.0.0.5:*8080*"
>> > > >   },
>> > > >   "tasks": [
>> > > >     {
>> > > >       "id": 0,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.5:*8080*"
>> > > >     },
>> > > >     {
>> > > >       "id": 1,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.5:*8080*"
>> > > >     }
>> > > >   ],
>> > > >   "type": "sink"
>> > > > }
>> > > >
>> > > > and
>> > > >
>> > > > {
>> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>> > > >   "connector": {
>> > > >     "state": "RUNNING",
>> > > >     "worker_id": "10.0.0.4:*8078*"
>> > > >   },
>> > > >   "tasks": [
>> > > >     {
>> > > >       "id": 0,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.4:*8078*"
>> > > >     },
>> > > >     {
>> > > >       "id": 1,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.4:*8078*"
>> > > >     }
>> > > >   ],
>> > > >   "type": "sink"
>> > > > }
>> > > >
>> > > > Regards and Thanks
>> > > > Deepak Raghav
>> > > >
>> > > >
>> > > >
>> > > > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io>
>> > > wrote:
>> > > >
>> > > > > Thanks for the clarification. If this is an actual problem that
>> > you're
>> > > > > encountering and need a solution to then since the task
>> allocation is
>> > > not
>> > > > > deterministic it sounds like you need to deploy separate worker
>> > > clusters
>> > > > > based on the workload patterns that you are seeing and machine
>> > > resources
>> > > > > available.
>> > > > >
>> > > > >
>> > > > > --
>> > > > >
>> > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
>> > > @rmoff
>> > > > >
>> > > > >
>> > > > > On Wed, 20 May 2020 at 16:39, Deepak Raghav <
>> > deepakraghav86@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Robin
>> > > > > >
>> > > > > > Replying to your query i.e
>> > > > > >
>> > > > > > One thing I'd ask at this point is though if it makes any
>> > difference
>> > > > > where
>> > > > > > the tasks execute?
>> > > > > >
>> > > > > > It actually makes difference to us, we have 16 connectors and
>> as I
>> > > > stated
>> > > > > > tasks division earlier, first 8 connector' task are assigned to
>> > first
>> > > > > > worker process and another connector's task to another worker
>> > process
>> > > > and
>> > > > > > just to mention that these 16 connectors are sink connectors.
>> Each
>> > > sink
>> > > > > > connector consumes message from different topic.There may be a
>> case
>> > > > when
>> > > > > > messages are coming only for first 8 connector's topic and
>> because
>> > > all
>> > > > > the
>> > > > > > tasks of these connectors are assigned to First Worker, load
>> would
>> > be
>> > > > > high
>> > > > > > on it and another set of connectors in another worker would be
>> > idle.
>> > > > > >
>> > > > > > Instead, if the task would have been divided evenly then this
>> case
>> > > > would
>> > > > > > have been avoided. Because tasks of each connector would be
>> present
>> > > in
>> > > > > both
>> > > > > > workers process like below :
>> > > > > >
>> > > > > > *W1*                       *W2*
>> > > > > >  C1T1                    C1T2
>> > > > > >  C2T2                    C2T2
>> > > > > >
>> > > > > > I hope, I gave your answer,
>> > > > > >
>> > > > > >
>> > > > > > Regards and Thanks
>> > > > > > Deepak Raghav
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <
>> robin@confluent.io>
>> > > > > wrote:
>> > > > > >
>> > > > > > > OK, I understand better now.
>> > > > > > >
>> > > > > > > You can read more about the guts of the rebalancing protocol
>> that
>> > > > Kafka
>> > > > > > > Connect uses as of Apache Kafka 2.3 an onwards here:
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
>> > > > > > >
>> > > > > > > One thing I'd ask at this point is though if it makes any
>> > > difference
>> > > > > > where
>> > > > > > > the tasks execute? The point of a cluster is that Kafka
>> Connect
>> > > > manages
>> > > > > > the
>> > > > > > > workload allocation. If you need workload separation and
>> > > > > > > guaranteed execution locality I would suggest separate Kafka
>> > > Connect
>> > > > > > > distributed clusters.
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > >
>> > > > > > > Robin Moffatt | Senior Developer Advocate |
>> robin@confluent.io |
>> > > > > @rmoff
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
>> > > > deepakraghav86@gmail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Robin
>> > > > > > > >
>> > > > > > > > Thanks for your reply.
>> > > > > > > >
>> > > > > > > > We are having two worker on different IP. The example which
>> I
>> > > gave
>> > > > > you
>> > > > > > it
>> > > > > > > > was just a example. We are using kafka version 2.3.1.
>> > > > > > > >
>> > > > > > > > Let me tell you again with a simple example.
>> > > > > > > >
>> > > > > > > > Suppose, we have two EC2 node, N1 and N2 having worker
>> process
>> > W1
>> > > > and
>> > > > > > W2
>> > > > > > > > running in distribute mode with groupId i.e in same cluster
>> and
>> > > two
>> > > > > > > > connectors with having two task each i.e
>> > > > > > > >
>> > > > > > > > Node N1: W1 is running
>> > > > > > > > Node N2 : W2 is running
>> > > > > > > >
>> > > > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with
>> id
>> > :
>> > > > C1T2
>> > > > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with
>> > id :
>> > > > > C2T2
>> > > > > > > >
>> > > > > > > > Now Suppose If both W1 and W2 worker process are running
>> and I
>> > > > > > register
>> > > > > > > > Connector C1 and C2 one after another i.e sequentially, on
>> any
>> > of
>> > > > the
>> > > > > > > > worker process, the tasks division between the worker
>> > > > > > > > node are happening like below, which is expected.
>> > > > > > > >
>> > > > > > > > *W1*                       *W2*
>> > > > > > > > C1T1                    C1T2
>> > > > > > > > C2T2                    C2T2
>> > > > > > > >
>> > > > > > > > Now, suppose I stop one worker process e.g W2 and start
>> after
>> > > some
>> > > > > > time,
>> > > > > > > > the tasks division is changed like below i.e first
>> connector's
>> > > task
>> > > > > > move
>> > > > > > > to
>> > > > > > > > W1 and second connector's task move to W2
>> > > > > > > >
>> > > > > > > > *W1*                       *W2*
>> > > > > > > > C1T1                    C2T1
>> > > > > > > > C1T2                    C2T2
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Please let me know, If it is understandable or not.
>> > > > > > > >
>> > > > > > > > Note : Actually, In production, we are gonna have 16
>> connectors
>> > > > > having
>> > > > > > 10
>> > > > > > > > task each and two worker node. With above scenario, first 8
>> > > > > > connectors's
>> > > > > > > > task move to W1 and next 8 connector' task move to W2,
>> Which is
>> > > not
>> > > > > > > > expected.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Regards and Thanks
>> > > > > > > > Deepak Raghav
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <
>> > > robin@confluent.io>
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > So you're running two workers on the same machine
>> (10.0.0.4),
>> > > is
>> > > > > > > > > that correct? Normally you'd run one worker per machine
>> > unless
>> > > > > there
>> > > > > > > was
>> > > > > > > > a
>> > > > > > > > > particular reason otherwise.
>> > > > > > > > > What version of Apache Kafka are you using?
>> > > > > > > > > I'm not clear from your question if the distribution of
>> tasks
>> > > is
>> > > > > > > > > presenting a problem to you (if so please describe why),
>> or
>> > if
>> > > > > you're
>> > > > > > > > just
>> > > > > > > > > interested in the theory behind the rebalancing protocol?
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > --
>> > > > > > > > >
>> > > > > > > > > Robin Moffatt | Senior Developer Advocate |
>> > robin@confluent.io
>> > > |
>> > > > > > > @rmoff
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
>> > > > > > deepakraghav86@gmail.com>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi
>> > > > > > > > > >
>> > > > > > > > > > Please, can anybody help me with this?
>> > > > > > > > > >
>> > > > > > > > > > Regards and Thanks
>> > > > > > > > > > Deepak Raghav
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
>> > > > > > > > deepakraghav86@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hi Team
>> > > > > > > > > > >
>> > > > > > > > > > > We have two worker node in a cluster and 2 connector
>> with
>> > > > > having
>> > > > > > 10
>> > > > > > > > > tasks
>> > > > > > > > > > > each.
>> > > > > > > > > > >
>> > > > > > > > > > > Now, suppose if we have two kafka connect process
>> W1(Port
>> > > > 8080)
>> > > > > > and
>> > > > > > > > > > > W2(Port 8078) started already in distribute mode and
>> then
>> > > > > > register
>> > > > > > > > the
>> > > > > > > > > > > connectors, task of one connector i.e 10 tasks are
>> > divided
>> > > > > > equally
>> > > > > > > > > > between
>> > > > > > > > > > > two worker i.e first task of A connector to W1 worker
>> > node
>> > > > and
>> > > > > > sec
>> > > > > > > > task
>> > > > > > > > > > of
>> > > > > > > > > > > A connector to W2 worker node, similarly for first
>> task
>> > of
>> > > B
>> > > > > > > > connector,
>> > > > > > > > > > > will go to W1 node and sec task of B connector go to
>> W2
>> > > node.
>> > > > > > > > > > >
>> > > > > > > > > > > e.g
>> > > > > > > > > > > *#First Connector : *
>> > > > > > > > > > > {
>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>> > > > > > > > > > >   "connector": {
>> > > > > > > > > > >     "state": "RUNNING",
>> > > > > > > > > > >     "worker_id": "10.0.0.4:*8080*"
>> > > > > > > > > > >   },
>> > > > > > > > > > >   "tasks": [
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 0,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:*8078*"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 1,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 2,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 3,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 4,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 5,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 6,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 7,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 8,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 9,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     }
>> > > > > > > > > > >   ],
>> > > > > > > > > > >   "type": "sink"
>> > > > > > > > > > > }
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > *#Sec connector*
>> > > > > > > > > > >
>> > > > > > > > > > > {
>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>> > > > > > > > > > >   "connector": {
>> > > > > > > > > > >     "state": "RUNNING",
>> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >   },
>> > > > > > > > > > >   "tasks": [
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 0,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 1,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 2,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 3,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 4,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 5,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 6,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 7,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 8,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 9,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     }
>> > > > > > > > > > >   ],
>> > > > > > > > > > >   "type": "sink"
>> > > > > > > > > > > }
>> > > > > > > > > > >
>> > > > > > > > > > > But I have seen a strange behavior, when I just
>> shutdown
>> > W2
>> > > > > > worker
>> > > > > > > > node
>> > > > > > > > > > > and start it again task are divided but in diff way
>> i.e
>> > all
>> > > > the
>> > > > > > > tasks
>> > > > > > > > > of
>> > > > > > > > > > A
>> > > > > > > > > > > connector will get into W1 node and tasks of B
>> Connector
>> > > into
>> > > > > W2
>> > > > > > > > node.
>> > > > > > > > > > >
>> > > > > > > > > > > Can you please have a look for this.
>> > > > > > > > > > >
>> > > > > > > > > > > *#First Connector*
>> > > > > > > > > > >
>> > > > > > > > > > > {
>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>> > > > > > > > > > >   "connector": {
>> > > > > > > > > > >     "state": "RUNNING",
>> > > > > > > > > > >     "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >   },
>> > > > > > > > > > >   "tasks": [
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 0,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 1,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 2,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 3,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 4,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 5,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 6,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 7,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 8,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 9,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     }
>> > > > > > > > > > >   ],
>> > > > > > > > > > >   "type": "sink"
>> > > > > > > > > > > }
>> > > > > > > > > > >
>> > > > > > > > > > > *#Second Connector *:
>> > > > > > > > > > >
>> > > > > > > > > > > {
>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>> > > > > > > > > > >   "connector": {
>> > > > > > > > > > >     "state": "RUNNING",
>> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >   },
>> > > > > > > > > > >   "tasks": [
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 0,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 1,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 2,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 3,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 4,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 5,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 6,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 7,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 8,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 9,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     }
>> > > > > > > > > > >   ],
>> > > > > > > > > > >   "type": "sink"
>> > > > > > > > > > > }
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > Regards and Thanks
>> > > > > > > > > > > Deepak Raghav
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>