You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Deepak Raghav <de...@gmail.com> on 2020/05/20 05:45:50 UTC

Re: Kafka Connect Connector Tasks Uneven Division

Hi

Please, can anybody help me with this?

Regards and Thanks
Deepak Raghav



On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <de...@gmail.com>
wrote:

> Hi Team
>
> We have two worker node in a cluster and 2 connector with having 10 tasks
> each.
>
> Now, suppose if we have two kafka connect process W1(Port 8080) and
> W2(Port 8078) started already in distribute mode and then register the
> connectors, task of one connector i.e 10 tasks are divided equally between
> two worker i.e first task of A connector to W1 worker node and sec task of
> A connector to W2 worker node, similarly for first task of B connector,
> will go to W1 node and sec task of B connector go to W2 node.
>
> e.g
> *#First Connector : *
> {
>   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.4:*8080*"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:*8078*"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 2,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 3,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 4,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 5,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 6,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 7,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 8,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 9,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     }
>   ],
>   "type": "sink"
> }
>
>
> *#Sec connector*
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.4:8078"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 2,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 3,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 4,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 5,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 6,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 7,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 8,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 9,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     }
>   ],
>   "type": "sink"
> }
>
> But I have seen a strange behavior, when I just shutdown W2 worker node
> and start it again task are divided but in diff way i.e all the tasks of A
> connector will get into W1 node and tasks of B Connector into W2 node.
>
> Can you please have a look for this.
>
> *#First Connector*
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.4:8080"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 2,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 3,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 4,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 5,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 6,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 7,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 8,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     },
>     {
>       "id": 9,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8080"
>     }
>   ],
>   "type": "sink"
> }
>
> *#Second Connector *:
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.4:8078"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 2,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 3,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 4,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 5,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 6,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 7,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 8,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     },
>     {
>       "id": 9,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:8078"
>     }
>   ],
>   "type": "sink"
> }
>
>
> Regards and Thanks
> Deepak Raghav
>
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Deepak Raghav <de...@gmail.com>.
Hi Robin

Request you to please reply.

Regards and Thanks
Deepak Raghav



On Wed, Jun 10, 2020 at 11:57 AM Deepak Raghav <de...@gmail.com>
wrote:

> Hi  Robin
>
> Can you please reply.
>
> I just want to add one more thing, that yesterday I tried with
> connect.protocal=eager. Task distribution was balanced after that.
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Tue, Jun 9, 2020 at 2:37 PM Deepak Raghav <de...@gmail.com>
> wrote:
>
>> Hi Robin
>>
>> Thanks for your reply and accept my apology for the delayed response.
>>
>> As you suggested that we should have a separate worker cluster based on
>> workload pattern. But as you said, task allocation is nondeterministic, so
>> same things can happen in the new cluster.
>>
>> Please let me know if my understanding is correct or not.
>>
>> Regards and Thanks
>> Deepak Raghav
>>
>>
>>
>> On Tue, May 26, 2020 at 8:20 PM Robin Moffatt <ro...@confluent.io> wrote:
>>
>>> The KIP for the current rebalancing protocol is probably a good
>>> reference:
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415:+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
>>>
>>>
>>> --
>>>
>>> Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
>>>
>>>
>>> On Tue, 26 May 2020 at 14:25, Deepak Raghav <de...@gmail.com>
>>> wrote:
>>>
>>> > Hi Robin
>>> >
>>> > Thanks for the clarification.
>>> >
>>> > As you suggested, that task allocation between the workers is
>>> > nondeterministic. I have shared the same information within in my team
>>> but
>>> > there are some other parties, with whom I need to share this
>>> information as
>>> > explanation for the issue raised by them and I cannot show this mail
>>> as a
>>> > reference.
>>> >
>>> > It would be very great if you please share any link/discussion
>>> reference
>>> > regarding the same.
>>> >
>>> > Regards and Thanks
>>> > Deepak Raghav
>>> >
>>> >
>>> >
>>> > On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io>
>>> wrote:
>>> >
>>> > > I don't think you're right to assert that this is "expected
>>> behaviour":
>>> > >
>>> > > >  the tasks are divided in below pattern when they are first time
>>> > > registered
>>> > >
>>> > > Kafka Connect task allocation is non-determanistic.
>>> > >
>>> > > I'm still not clear if you're solving for a theoretical problem or an
>>> > > actual one. If this is an actual problem that you're encountering and
>>> > need
>>> > > a solution to then since the task allocation is not deterministic it
>>> > sounds
>>> > > like you need to deploy separate worker clusters based on the
>>> workload
>>> > > patterns that you are seeing and machine resources available.
>>> > >
>>> > >
>>> > > --
>>> > >
>>> > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
>>> @rmoff
>>> > >
>>> > >
>>> > > On Wed, 20 May 2020 at 21:29, Deepak Raghav <
>>> deepakraghav86@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi Robin
>>> > > >
>>> > > > I had gone though the link you provided, It is not helpful in my
>>> case.
>>> > > > Apart from this, *I am not getting why the tasks are divided in
>>> *below
>>> > > > pattern* when they are *first time registered*, which is expected
>>> > > behavior.
>>> > > > I*s there any parameter which we can pass in worker property file
>>> which
>>> > > > handle the task assignment strategy like we have range assigner or
>>> > round
>>> > > > robin in consumer-group ?
>>> > > >
>>> > > > connector rest status api result after first registration :
>>> > > >
>>> > > > {
>>> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>>> > > >   "connector": {
>>> > > >     "state": "RUNNING",
>>> > > >     "worker_id": "10.0.0.5:*8080*"
>>> > > >   },
>>> > > >   "tasks": [
>>> > > >     {
>>> > > >       "id": 0,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.4:*8078*"
>>> > > >     },
>>> > > >     {
>>> > > >       "id": 1,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.5:*8080*"
>>> > > >     }
>>> > > >   ],
>>> > > >   "type": "sink"
>>> > > > }
>>> > > >
>>> > > > and
>>> > > >
>>> > > > {
>>> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>>> > > >   "connector": {
>>> > > >     "state": "RUNNING",
>>> > > >     "worker_id": "10.0.0.4:*8078*"
>>> > > >   },
>>> > > >   "tasks": [
>>> > > >     {
>>> > > >       "id": 0,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.4:*8078*"
>>> > > >     },
>>> > > >     {
>>> > > >       "id": 1,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.5:*8080*"
>>> > > >     }
>>> > > >   ],
>>> > > >   "type": "sink"
>>> > > > }
>>> > > >
>>> > > >
>>> > > > But when I stop the second worker process and wait for
>>> > > > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and
>>> start the
>>> > > > process again. Result is different.
>>> > > >
>>> > > > {
>>> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>>> > > >   "connector": {
>>> > > >     "state": "RUNNING",
>>> > > >     "worker_id": "10.0.0.5:*8080*"
>>> > > >   },
>>> > > >   "tasks": [
>>> > > >     {
>>> > > >       "id": 0,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.5:*8080*"
>>> > > >     },
>>> > > >     {
>>> > > >       "id": 1,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.5:*8080*"
>>> > > >     }
>>> > > >   ],
>>> > > >   "type": "sink"
>>> > > > }
>>> > > >
>>> > > > and
>>> > > >
>>> > > > {
>>> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>>> > > >   "connector": {
>>> > > >     "state": "RUNNING",
>>> > > >     "worker_id": "10.0.0.4:*8078*"
>>> > > >   },
>>> > > >   "tasks": [
>>> > > >     {
>>> > > >       "id": 0,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.4:*8078*"
>>> > > >     },
>>> > > >     {
>>> > > >       "id": 1,
>>> > > >       "state": "RUNNING",
>>> > > >       "worker_id": "10.0.0.4:*8078*"
>>> > > >     }
>>> > > >   ],
>>> > > >   "type": "sink"
>>> > > > }
>>> > > >
>>> > > > Regards and Thanks
>>> > > > Deepak Raghav
>>> > > >
>>> > > >
>>> > > >
>>> > > > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io>
>>> > > wrote:
>>> > > >
>>> > > > > Thanks for the clarification. If this is an actual problem that
>>> > you're
>>> > > > > encountering and need a solution to then since the task
>>> allocation is
>>> > > not
>>> > > > > deterministic it sounds like you need to deploy separate worker
>>> > > clusters
>>> > > > > based on the workload patterns that you are seeing and machine
>>> > > resources
>>> > > > > available.
>>> > > > >
>>> > > > >
>>> > > > > --
>>> > > > >
>>> > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
>>> > > @rmoff
>>> > > > >
>>> > > > >
>>> > > > > On Wed, 20 May 2020 at 16:39, Deepak Raghav <
>>> > deepakraghav86@gmail.com>
>>> > > > > wrote:
>>> > > > >
>>> > > > > > Hi Robin
>>> > > > > >
>>> > > > > > Replying to your query i.e
>>> > > > > >
>>> > > > > > One thing I'd ask at this point is though if it makes any
>>> > difference
>>> > > > > where
>>> > > > > > the tasks execute?
>>> > > > > >
>>> > > > > > It actually makes difference to us, we have 16 connectors and
>>> as I
>>> > > > stated
>>> > > > > > tasks division earlier, first 8 connector' task are assigned to
>>> > first
>>> > > > > > worker process and another connector's task to another worker
>>> > process
>>> > > > and
>>> > > > > > just to mention that these 16 connectors are sink connectors.
>>> Each
>>> > > sink
>>> > > > > > connector consumes message from different topic.There may be a
>>> case
>>> > > > when
>>> > > > > > messages are coming only for first 8 connector's topic and
>>> because
>>> > > all
>>> > > > > the
>>> > > > > > tasks of these connectors are assigned to First Worker, load
>>> would
>>> > be
>>> > > > > high
>>> > > > > > on it and another set of connectors in another worker would be
>>> > idle.
>>> > > > > >
>>> > > > > > Instead, if the task would have been divided evenly then this
>>> case
>>> > > > would
>>> > > > > > have been avoided. Because tasks of each connector would be
>>> present
>>> > > in
>>> > > > > both
>>> > > > > > workers process like below :
>>> > > > > >
>>> > > > > > *W1*                       *W2*
>>> > > > > >  C1T1                    C1T2
>>> > > > > >  C2T2                    C2T2
>>> > > > > >
>>> > > > > > I hope, I gave your answer,
>>> > > > > >
>>> > > > > >
>>> > > > > > Regards and Thanks
>>> > > > > > Deepak Raghav
>>> > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <
>>> robin@confluent.io>
>>> > > > > wrote:
>>> > > > > >
>>> > > > > > > OK, I understand better now.
>>> > > > > > >
>>> > > > > > > You can read more about the guts of the rebalancing protocol
>>> that
>>> > > > Kafka
>>> > > > > > > Connect uses as of Apache Kafka 2.3 an onwards here:
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
>>> > > > > > >
>>> > > > > > > One thing I'd ask at this point is though if it makes any
>>> > > difference
>>> > > > > > where
>>> > > > > > > the tasks execute? The point of a cluster is that Kafka
>>> Connect
>>> > > > manages
>>> > > > > > the
>>> > > > > > > workload allocation. If you need workload separation and
>>> > > > > > > guaranteed execution locality I would suggest separate Kafka
>>> > > Connect
>>> > > > > > > distributed clusters.
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > --
>>> > > > > > >
>>> > > > > > > Robin Moffatt | Senior Developer Advocate |
>>> robin@confluent.io |
>>> > > > > @rmoff
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
>>> > > > deepakraghav86@gmail.com>
>>> > > > > > > wrote:
>>> > > > > > >
>>> > > > > > > > Hi Robin
>>> > > > > > > >
>>> > > > > > > > Thanks for your reply.
>>> > > > > > > >
>>> > > > > > > > We are having two worker on different IP. The example
>>> which I
>>> > > gave
>>> > > > > you
>>> > > > > > it
>>> > > > > > > > was just a example. We are using kafka version 2.3.1.
>>> > > > > > > >
>>> > > > > > > > Let me tell you again with a simple example.
>>> > > > > > > >
>>> > > > > > > > Suppose, we have two EC2 node, N1 and N2 having worker
>>> process
>>> > W1
>>> > > > and
>>> > > > > > W2
>>> > > > > > > > running in distribute mode with groupId i.e in same
>>> cluster and
>>> > > two
>>> > > > > > > > connectors with having two task each i.e
>>> > > > > > > >
>>> > > > > > > > Node N1: W1 is running
>>> > > > > > > > Node N2 : W2 is running
>>> > > > > > > >
>>> > > > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2
>>> with id
>>> > :
>>> > > > C1T2
>>> > > > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2
>>> with
>>> > id :
>>> > > > > C2T2
>>> > > > > > > >
>>> > > > > > > > Now Suppose If both W1 and W2 worker process are running
>>> and I
>>> > > > > > register
>>> > > > > > > > Connector C1 and C2 one after another i.e sequentially, on
>>> any
>>> > of
>>> > > > the
>>> > > > > > > > worker process, the tasks division between the worker
>>> > > > > > > > node are happening like below, which is expected.
>>> > > > > > > >
>>> > > > > > > > *W1*                       *W2*
>>> > > > > > > > C1T1                    C1T2
>>> > > > > > > > C2T2                    C2T2
>>> > > > > > > >
>>> > > > > > > > Now, suppose I stop one worker process e.g W2 and start
>>> after
>>> > > some
>>> > > > > > time,
>>> > > > > > > > the tasks division is changed like below i.e first
>>> connector's
>>> > > task
>>> > > > > > move
>>> > > > > > > to
>>> > > > > > > > W1 and second connector's task move to W2
>>> > > > > > > >
>>> > > > > > > > *W1*                       *W2*
>>> > > > > > > > C1T1                    C2T1
>>> > > > > > > > C1T2                    C2T2
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > Please let me know, If it is understandable or not.
>>> > > > > > > >
>>> > > > > > > > Note : Actually, In production, we are gonna have 16
>>> connectors
>>> > > > > having
>>> > > > > > 10
>>> > > > > > > > task each and two worker node. With above scenario, first 8
>>> > > > > > connectors's
>>> > > > > > > > task move to W1 and next 8 connector' task move to W2,
>>> Which is
>>> > > not
>>> > > > > > > > expected.
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > Regards and Thanks
>>> > > > > > > > Deepak Raghav
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <
>>> > > robin@confluent.io>
>>> > > > > > > wrote:
>>> > > > > > > >
>>> > > > > > > > > So you're running two workers on the same machine
>>> (10.0.0.4),
>>> > > is
>>> > > > > > > > > that correct? Normally you'd run one worker per machine
>>> > unless
>>> > > > > there
>>> > > > > > > was
>>> > > > > > > > a
>>> > > > > > > > > particular reason otherwise.
>>> > > > > > > > > What version of Apache Kafka are you using?
>>> > > > > > > > > I'm not clear from your question if the distribution of
>>> tasks
>>> > > is
>>> > > > > > > > > presenting a problem to you (if so please describe why),
>>> or
>>> > if
>>> > > > > you're
>>> > > > > > > > just
>>> > > > > > > > > interested in the theory behind the rebalancing protocol?
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > > > --
>>> > > > > > > > >
>>> > > > > > > > > Robin Moffatt | Senior Developer Advocate |
>>> > robin@confluent.io
>>> > > |
>>> > > > > > > @rmoff
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
>>> > > > > > deepakraghav86@gmail.com>
>>> > > > > > > > > wrote:
>>> > > > > > > > >
>>> > > > > > > > > > Hi
>>> > > > > > > > > >
>>> > > > > > > > > > Please, can anybody help me with this?
>>> > > > > > > > > >
>>> > > > > > > > > > Regards and Thanks
>>> > > > > > > > > > Deepak Raghav
>>> > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
>>> > > > > > > > deepakraghav86@gmail.com>
>>> > > > > > > > > > wrote:
>>> > > > > > > > > >
>>> > > > > > > > > > > Hi Team
>>> > > > > > > > > > >
>>> > > > > > > > > > > We have two worker node in a cluster and 2 connector
>>> with
>>> > > > > having
>>> > > > > > 10
>>> > > > > > > > > tasks
>>> > > > > > > > > > > each.
>>> > > > > > > > > > >
>>> > > > > > > > > > > Now, suppose if we have two kafka connect process
>>> W1(Port
>>> > > > 8080)
>>> > > > > > and
>>> > > > > > > > > > > W2(Port 8078) started already in distribute mode and
>>> then
>>> > > > > > register
>>> > > > > > > > the
>>> > > > > > > > > > > connectors, task of one connector i.e 10 tasks are
>>> > divided
>>> > > > > > equally
>>> > > > > > > > > > between
>>> > > > > > > > > > > two worker i.e first task of A connector to W1 worker
>>> > node
>>> > > > and
>>> > > > > > sec
>>> > > > > > > > task
>>> > > > > > > > > > of
>>> > > > > > > > > > > A connector to W2 worker node, similarly for first
>>> task
>>> > of
>>> > > B
>>> > > > > > > > connector,
>>> > > > > > > > > > > will go to W1 node and sec task of B connector go to
>>> W2
>>> > > node.
>>> > > > > > > > > > >
>>> > > > > > > > > > > e.g
>>> > > > > > > > > > > *#First Connector : *
>>> > > > > > > > > > > {
>>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>>> > > > > > > > > > >   "connector": {
>>> > > > > > > > > > >     "state": "RUNNING",
>>> > > > > > > > > > >     "worker_id": "10.0.0.4:*8080*"
>>> > > > > > > > > > >   },
>>> > > > > > > > > > >   "tasks": [
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 0,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:*8078*"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 1,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 2,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 3,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 4,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 5,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 6,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 7,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 8,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 9,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     }
>>> > > > > > > > > > >   ],
>>> > > > > > > > > > >   "type": "sink"
>>> > > > > > > > > > > }
>>> > > > > > > > > > >
>>> > > > > > > > > > >
>>> > > > > > > > > > > *#Sec connector*
>>> > > > > > > > > > >
>>> > > > > > > > > > > {
>>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>>> > > > > > > > > > >   "connector": {
>>> > > > > > > > > > >     "state": "RUNNING",
>>> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >   },
>>> > > > > > > > > > >   "tasks": [
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 0,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 1,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 2,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 3,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 4,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 5,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 6,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 7,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 8,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 9,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     }
>>> > > > > > > > > > >   ],
>>> > > > > > > > > > >   "type": "sink"
>>> > > > > > > > > > > }
>>> > > > > > > > > > >
>>> > > > > > > > > > > But I have seen a strange behavior, when I just
>>> shutdown
>>> > W2
>>> > > > > > worker
>>> > > > > > > > node
>>> > > > > > > > > > > and start it again task are divided but in diff way
>>> i.e
>>> > all
>>> > > > the
>>> > > > > > > tasks
>>> > > > > > > > > of
>>> > > > > > > > > > A
>>> > > > > > > > > > > connector will get into W1 node and tasks of B
>>> Connector
>>> > > into
>>> > > > > W2
>>> > > > > > > > node.
>>> > > > > > > > > > >
>>> > > > > > > > > > > Can you please have a look for this.
>>> > > > > > > > > > >
>>> > > > > > > > > > > *#First Connector*
>>> > > > > > > > > > >
>>> > > > > > > > > > > {
>>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>>> > > > > > > > > > >   "connector": {
>>> > > > > > > > > > >     "state": "RUNNING",
>>> > > > > > > > > > >     "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >   },
>>> > > > > > > > > > >   "tasks": [
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 0,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 1,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 2,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 3,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 4,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 5,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 6,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 7,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 8,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 9,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>>> > > > > > > > > > >     }
>>> > > > > > > > > > >   ],
>>> > > > > > > > > > >   "type": "sink"
>>> > > > > > > > > > > }
>>> > > > > > > > > > >
>>> > > > > > > > > > > *#Second Connector *:
>>> > > > > > > > > > >
>>> > > > > > > > > > > {
>>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>>> > > > > > > > > > >   "connector": {
>>> > > > > > > > > > >     "state": "RUNNING",
>>> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >   },
>>> > > > > > > > > > >   "tasks": [
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 0,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 1,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 2,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 3,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 4,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 5,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 6,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 7,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 8,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     },
>>> > > > > > > > > > >     {
>>> > > > > > > > > > >       "id": 9,
>>> > > > > > > > > > >       "state": "RUNNING",
>>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>>> > > > > > > > > > >     }
>>> > > > > > > > > > >   ],
>>> > > > > > > > > > >   "type": "sink"
>>> > > > > > > > > > > }
>>> > > > > > > > > > >
>>> > > > > > > > > > >
>>> > > > > > > > > > > Regards and Thanks
>>> > > > > > > > > > > Deepak Raghav
>>> > > > > > > > > > >
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Deepak Raghav <de...@gmail.com>.
Hi  Robin

Can you please reply.

I just want to add one more thing, that yesterday I tried with
connect.protocal=eager. Task distribution was balanced after that.

Regards and Thanks
Deepak Raghav



On Tue, Jun 9, 2020 at 2:37 PM Deepak Raghav <de...@gmail.com>
wrote:

> Hi Robin
>
> Thanks for your reply and accept my apology for the delayed response.
>
> As you suggested that we should have a separate worker cluster based on
> workload pattern. But as you said, task allocation is nondeterministic, so
> same things can happen in the new cluster.
>
> Please let me know if my understanding is correct or not.
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Tue, May 26, 2020 at 8:20 PM Robin Moffatt <ro...@confluent.io> wrote:
>
>> The KIP for the current rebalancing protocol is probably a good reference:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415:+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
>>
>>
>> --
>>
>> Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
>>
>>
>> On Tue, 26 May 2020 at 14:25, Deepak Raghav <de...@gmail.com>
>> wrote:
>>
>> > Hi Robin
>> >
>> > Thanks for the clarification.
>> >
>> > As you suggested, that task allocation between the workers is
>> > nondeterministic. I have shared the same information within in my team
>> but
>> > there are some other parties, with whom I need to share this
>> information as
>> > explanation for the issue raised by them and I cannot show this mail as
>> a
>> > reference.
>> >
>> > It would be very great if you please share any link/discussion reference
>> > regarding the same.
>> >
>> > Regards and Thanks
>> > Deepak Raghav
>> >
>> >
>> >
>> > On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io>
>> wrote:
>> >
>> > > I don't think you're right to assert that this is "expected
>> behaviour":
>> > >
>> > > >  the tasks are divided in below pattern when they are first time
>> > > registered
>> > >
>> > > Kafka Connect task allocation is non-determanistic.
>> > >
>> > > I'm still not clear if you're solving for a theoretical problem or an
>> > > actual one. If this is an actual problem that you're encountering and
>> > need
>> > > a solution to then since the task allocation is not deterministic it
>> > sounds
>> > > like you need to deploy separate worker clusters based on the workload
>> > > patterns that you are seeing and machine resources available.
>> > >
>> > >
>> > > --
>> > >
>> > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
>> @rmoff
>> > >
>> > >
>> > > On Wed, 20 May 2020 at 21:29, Deepak Raghav <deepakraghav86@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > Hi Robin
>> > > >
>> > > > I had gone though the link you provided, It is not helpful in my
>> case.
>> > > > Apart from this, *I am not getting why the tasks are divided in
>> *below
>> > > > pattern* when they are *first time registered*, which is expected
>> > > behavior.
>> > > > I*s there any parameter which we can pass in worker property file
>> which
>> > > > handle the task assignment strategy like we have range assigner or
>> > round
>> > > > robin in consumer-group ?
>> > > >
>> > > > connector rest status api result after first registration :
>> > > >
>> > > > {
>> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>> > > >   "connector": {
>> > > >     "state": "RUNNING",
>> > > >     "worker_id": "10.0.0.5:*8080*"
>> > > >   },
>> > > >   "tasks": [
>> > > >     {
>> > > >       "id": 0,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.4:*8078*"
>> > > >     },
>> > > >     {
>> > > >       "id": 1,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.5:*8080*"
>> > > >     }
>> > > >   ],
>> > > >   "type": "sink"
>> > > > }
>> > > >
>> > > > and
>> > > >
>> > > > {
>> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>> > > >   "connector": {
>> > > >     "state": "RUNNING",
>> > > >     "worker_id": "10.0.0.4:*8078*"
>> > > >   },
>> > > >   "tasks": [
>> > > >     {
>> > > >       "id": 0,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.4:*8078*"
>> > > >     },
>> > > >     {
>> > > >       "id": 1,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.5:*8080*"
>> > > >     }
>> > > >   ],
>> > > >   "type": "sink"
>> > > > }
>> > > >
>> > > >
>> > > > But when I stop the second worker process and wait for
>> > > > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start
>> the
>> > > > process again. Result is different.
>> > > >
>> > > > {
>> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>> > > >   "connector": {
>> > > >     "state": "RUNNING",
>> > > >     "worker_id": "10.0.0.5:*8080*"
>> > > >   },
>> > > >   "tasks": [
>> > > >     {
>> > > >       "id": 0,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.5:*8080*"
>> > > >     },
>> > > >     {
>> > > >       "id": 1,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.5:*8080*"
>> > > >     }
>> > > >   ],
>> > > >   "type": "sink"
>> > > > }
>> > > >
>> > > > and
>> > > >
>> > > > {
>> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>> > > >   "connector": {
>> > > >     "state": "RUNNING",
>> > > >     "worker_id": "10.0.0.4:*8078*"
>> > > >   },
>> > > >   "tasks": [
>> > > >     {
>> > > >       "id": 0,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.4:*8078*"
>> > > >     },
>> > > >     {
>> > > >       "id": 1,
>> > > >       "state": "RUNNING",
>> > > >       "worker_id": "10.0.0.4:*8078*"
>> > > >     }
>> > > >   ],
>> > > >   "type": "sink"
>> > > > }
>> > > >
>> > > > Regards and Thanks
>> > > > Deepak Raghav
>> > > >
>> > > >
>> > > >
>> > > > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io>
>> > > wrote:
>> > > >
>> > > > > Thanks for the clarification. If this is an actual problem that
>> > you're
>> > > > > encountering and need a solution to then since the task
>> allocation is
>> > > not
>> > > > > deterministic it sounds like you need to deploy separate worker
>> > > clusters
>> > > > > based on the workload patterns that you are seeing and machine
>> > > resources
>> > > > > available.
>> > > > >
>> > > > >
>> > > > > --
>> > > > >
>> > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
>> > > @rmoff
>> > > > >
>> > > > >
>> > > > > On Wed, 20 May 2020 at 16:39, Deepak Raghav <
>> > deepakraghav86@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Robin
>> > > > > >
>> > > > > > Replying to your query i.e
>> > > > > >
>> > > > > > One thing I'd ask at this point is though if it makes any
>> > difference
>> > > > > where
>> > > > > > the tasks execute?
>> > > > > >
>> > > > > > It actually makes difference to us, we have 16 connectors and
>> as I
>> > > > stated
>> > > > > > tasks division earlier, first 8 connector' task are assigned to
>> > first
>> > > > > > worker process and another connector's task to another worker
>> > process
>> > > > and
>> > > > > > just to mention that these 16 connectors are sink connectors.
>> Each
>> > > sink
>> > > > > > connector consumes message from different topic.There may be a
>> case
>> > > > when
>> > > > > > messages are coming only for first 8 connector's topic and
>> because
>> > > all
>> > > > > the
>> > > > > > tasks of these connectors are assigned to First Worker, load
>> would
>> > be
>> > > > > high
>> > > > > > on it and another set of connectors in another worker would be
>> > idle.
>> > > > > >
>> > > > > > Instead, if the task would have been divided evenly then this
>> case
>> > > > would
>> > > > > > have been avoided. Because tasks of each connector would be
>> present
>> > > in
>> > > > > both
>> > > > > > workers process like below :
>> > > > > >
>> > > > > > *W1*                       *W2*
>> > > > > >  C1T1                    C1T2
>> > > > > >  C2T2                    C2T2
>> > > > > >
>> > > > > > I hope, I gave your answer,
>> > > > > >
>> > > > > >
>> > > > > > Regards and Thanks
>> > > > > > Deepak Raghav
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <
>> robin@confluent.io>
>> > > > > wrote:
>> > > > > >
>> > > > > > > OK, I understand better now.
>> > > > > > >
>> > > > > > > You can read more about the guts of the rebalancing protocol
>> that
>> > > > Kafka
>> > > > > > > Connect uses as of Apache Kafka 2.3 an onwards here:
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
>> > > > > > >
>> > > > > > > One thing I'd ask at this point is though if it makes any
>> > > difference
>> > > > > > where
>> > > > > > > the tasks execute? The point of a cluster is that Kafka
>> Connect
>> > > > manages
>> > > > > > the
>> > > > > > > workload allocation. If you need workload separation and
>> > > > > > > guaranteed execution locality I would suggest separate Kafka
>> > > Connect
>> > > > > > > distributed clusters.
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > >
>> > > > > > > Robin Moffatt | Senior Developer Advocate |
>> robin@confluent.io |
>> > > > > @rmoff
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
>> > > > deepakraghav86@gmail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Robin
>> > > > > > > >
>> > > > > > > > Thanks for your reply.
>> > > > > > > >
>> > > > > > > > We are having two worker on different IP. The example which
>> I
>> > > gave
>> > > > > you
>> > > > > > it
>> > > > > > > > was just a example. We are using kafka version 2.3.1.
>> > > > > > > >
>> > > > > > > > Let me tell you again with a simple example.
>> > > > > > > >
>> > > > > > > > Suppose, we have two EC2 node, N1 and N2 having worker
>> process
>> > W1
>> > > > and
>> > > > > > W2
>> > > > > > > > running in distribute mode with groupId i.e in same cluster
>> and
>> > > two
>> > > > > > > > connectors with having two task each i.e
>> > > > > > > >
>> > > > > > > > Node N1: W1 is running
>> > > > > > > > Node N2 : W2 is running
>> > > > > > > >
>> > > > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with
>> id
>> > :
>> > > > C1T2
>> > > > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with
>> > id :
>> > > > > C2T2
>> > > > > > > >
>> > > > > > > > Now Suppose If both W1 and W2 worker process are running
>> and I
>> > > > > > register
>> > > > > > > > Connector C1 and C2 one after another i.e sequentially, on
>> any
>> > of
>> > > > the
>> > > > > > > > worker process, the tasks division between the worker
>> > > > > > > > node are happening like below, which is expected.
>> > > > > > > >
>> > > > > > > > *W1*                       *W2*
>> > > > > > > > C1T1                    C1T2
>> > > > > > > > C2T2                    C2T2
>> > > > > > > >
>> > > > > > > > Now, suppose I stop one worker process e.g W2 and start
>> after
>> > > some
>> > > > > > time,
>> > > > > > > > the tasks division is changed like below i.e first
>> connector's
>> > > task
>> > > > > > move
>> > > > > > > to
>> > > > > > > > W1 and second connector's task move to W2
>> > > > > > > >
>> > > > > > > > *W1*                       *W2*
>> > > > > > > > C1T1                    C2T1
>> > > > > > > > C1T2                    C2T2
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Please let me know, If it is understandable or not.
>> > > > > > > >
>> > > > > > > > Note : Actually, In production, we are gonna have 16
>> connectors
>> > > > > having
>> > > > > > 10
>> > > > > > > > task each and two worker node. With above scenario, first 8
>> > > > > > connectors's
>> > > > > > > > task move to W1 and next 8 connector' task move to W2,
>> Which is
>> > > not
>> > > > > > > > expected.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Regards and Thanks
>> > > > > > > > Deepak Raghav
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <
>> > > robin@confluent.io>
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > So you're running two workers on the same machine
>> (10.0.0.4),
>> > > is
>> > > > > > > > > that correct? Normally you'd run one worker per machine
>> > unless
>> > > > > there
>> > > > > > > was
>> > > > > > > > a
>> > > > > > > > > particular reason otherwise.
>> > > > > > > > > What version of Apache Kafka are you using?
>> > > > > > > > > I'm not clear from your question if the distribution of
>> tasks
>> > > is
>> > > > > > > > > presenting a problem to you (if so please describe why),
>> or
>> > if
>> > > > > you're
>> > > > > > > > just
>> > > > > > > > > interested in the theory behind the rebalancing protocol?
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > --
>> > > > > > > > >
>> > > > > > > > > Robin Moffatt | Senior Developer Advocate |
>> > robin@confluent.io
>> > > |
>> > > > > > > @rmoff
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
>> > > > > > deepakraghav86@gmail.com>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi
>> > > > > > > > > >
>> > > > > > > > > > Please, can anybody help me with this?
>> > > > > > > > > >
>> > > > > > > > > > Regards and Thanks
>> > > > > > > > > > Deepak Raghav
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
>> > > > > > > > deepakraghav86@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hi Team
>> > > > > > > > > > >
>> > > > > > > > > > > We have two worker node in a cluster and 2 connector
>> with
>> > > > > having
>> > > > > > 10
>> > > > > > > > > tasks
>> > > > > > > > > > > each.
>> > > > > > > > > > >
>> > > > > > > > > > > Now, suppose if we have two kafka connect process
>> W1(Port
>> > > > 8080)
>> > > > > > and
>> > > > > > > > > > > W2(Port 8078) started already in distribute mode and
>> then
>> > > > > > register
>> > > > > > > > the
>> > > > > > > > > > > connectors, task of one connector i.e 10 tasks are
>> > divided
>> > > > > > equally
>> > > > > > > > > > between
>> > > > > > > > > > > two worker i.e first task of A connector to W1 worker
>> > node
>> > > > and
>> > > > > > sec
>> > > > > > > > task
>> > > > > > > > > > of
>> > > > > > > > > > > A connector to W2 worker node, similarly for first
>> task
>> > of
>> > > B
>> > > > > > > > connector,
>> > > > > > > > > > > will go to W1 node and sec task of B connector go to
>> W2
>> > > node.
>> > > > > > > > > > >
>> > > > > > > > > > > e.g
>> > > > > > > > > > > *#First Connector : *
>> > > > > > > > > > > {
>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>> > > > > > > > > > >   "connector": {
>> > > > > > > > > > >     "state": "RUNNING",
>> > > > > > > > > > >     "worker_id": "10.0.0.4:*8080*"
>> > > > > > > > > > >   },
>> > > > > > > > > > >   "tasks": [
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 0,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:*8078*"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 1,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 2,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 3,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 4,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 5,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 6,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 7,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 8,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 9,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     }
>> > > > > > > > > > >   ],
>> > > > > > > > > > >   "type": "sink"
>> > > > > > > > > > > }
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > *#Sec connector*
>> > > > > > > > > > >
>> > > > > > > > > > > {
>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>> > > > > > > > > > >   "connector": {
>> > > > > > > > > > >     "state": "RUNNING",
>> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >   },
>> > > > > > > > > > >   "tasks": [
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 0,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 1,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 2,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 3,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 4,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 5,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 6,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 7,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 8,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 9,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     }
>> > > > > > > > > > >   ],
>> > > > > > > > > > >   "type": "sink"
>> > > > > > > > > > > }
>> > > > > > > > > > >
>> > > > > > > > > > > But I have seen a strange behavior, when I just
>> shutdown
>> > W2
>> > > > > > worker
>> > > > > > > > node
>> > > > > > > > > > > and start it again task are divided but in diff way
>> i.e
>> > all
>> > > > the
>> > > > > > > tasks
>> > > > > > > > > of
>> > > > > > > > > > A
>> > > > > > > > > > > connector will get into W1 node and tasks of B
>> Connector
>> > > into
>> > > > > W2
>> > > > > > > > node.
>> > > > > > > > > > >
>> > > > > > > > > > > Can you please have a look for this.
>> > > > > > > > > > >
>> > > > > > > > > > > *#First Connector*
>> > > > > > > > > > >
>> > > > > > > > > > > {
>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>> > > > > > > > > > >   "connector": {
>> > > > > > > > > > >     "state": "RUNNING",
>> > > > > > > > > > >     "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >   },
>> > > > > > > > > > >   "tasks": [
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 0,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 1,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 2,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 3,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 4,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 5,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 6,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 7,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 8,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 9,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
>> > > > > > > > > > >     }
>> > > > > > > > > > >   ],
>> > > > > > > > > > >   "type": "sink"
>> > > > > > > > > > > }
>> > > > > > > > > > >
>> > > > > > > > > > > *#Second Connector *:
>> > > > > > > > > > >
>> > > > > > > > > > > {
>> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>> > > > > > > > > > >   "connector": {
>> > > > > > > > > > >     "state": "RUNNING",
>> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >   },
>> > > > > > > > > > >   "tasks": [
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 0,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 1,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 2,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 3,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 4,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 5,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 6,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 7,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 8,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     },
>> > > > > > > > > > >     {
>> > > > > > > > > > >       "id": 9,
>> > > > > > > > > > >       "state": "RUNNING",
>> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
>> > > > > > > > > > >     }
>> > > > > > > > > > >   ],
>> > > > > > > > > > >   "type": "sink"
>> > > > > > > > > > > }
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > Regards and Thanks
>> > > > > > > > > > > Deepak Raghav
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Deepak Raghav <de...@gmail.com>.
Hi Robin

Thanks for your reply and accept my apology for the delayed response.

As you suggested that we should have a separate worker cluster based on
workload pattern. But as you said, task allocation is nondeterministic, so
same things can happen in the new cluster.

Please let me know if my understanding is correct or not.

Regards and Thanks
Deepak Raghav



On Tue, May 26, 2020 at 8:20 PM Robin Moffatt <ro...@confluent.io> wrote:

> The KIP for the current rebalancing protocol is probably a good reference:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415:+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
>
>
> On Tue, 26 May 2020 at 14:25, Deepak Raghav <de...@gmail.com>
> wrote:
>
> > Hi Robin
> >
> > Thanks for the clarification.
> >
> > As you suggested, that task allocation between the workers is
> > nondeterministic. I have shared the same information within in my team
> but
> > there are some other parties, with whom I need to share this information
> as
> > explanation for the issue raised by them and I cannot show this mail as a
> > reference.
> >
> > It would be very great if you please share any link/discussion reference
> > regarding the same.
> >
> > Regards and Thanks
> > Deepak Raghav
> >
> >
> >
> > On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io>
> wrote:
> >
> > > I don't think you're right to assert that this is "expected behaviour":
> > >
> > > >  the tasks are divided in below pattern when they are first time
> > > registered
> > >
> > > Kafka Connect task allocation is non-determanistic.
> > >
> > > I'm still not clear if you're solving for a theoretical problem or an
> > > actual one. If this is an actual problem that you're encountering and
> > need
> > > a solution to then since the task allocation is not deterministic it
> > sounds
> > > like you need to deploy separate worker clusters based on the workload
> > > patterns that you are seeing and machine resources available.
> > >
> > >
> > > --
> > >
> > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> @rmoff
> > >
> > >
> > > On Wed, 20 May 2020 at 21:29, Deepak Raghav <de...@gmail.com>
> > > wrote:
> > >
> > > > Hi Robin
> > > >
> > > > I had gone though the link you provided, It is not helpful in my
> case.
> > > > Apart from this, *I am not getting why the tasks are divided in
> *below
> > > > pattern* when they are *first time registered*, which is expected
> > > behavior.
> > > > I*s there any parameter which we can pass in worker property file
> which
> > > > handle the task assignment strategy like we have range assigner or
> > round
> > > > robin in consumer-group ?
> > > >
> > > > connector rest status api result after first registration :
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.5:*8080*"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:*8078*"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.5:*8080*"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > > and
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.4:*8078*"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:*8078*"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.5:*8080*"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > >
> > > > But when I stop the second worker process and wait for
> > > > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start
> the
> > > > process again. Result is different.
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.5:*8080*"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.5:*8080*"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.5:*8080*"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > > and
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.4:*8078*"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:*8078*"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:*8078*"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > > Regards and Thanks
> > > > Deepak Raghav
> > > >
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io>
> > > wrote:
> > > >
> > > > > Thanks for the clarification. If this is an actual problem that
> > you're
> > > > > encountering and need a solution to then since the task allocation
> is
> > > not
> > > > > deterministic it sounds like you need to deploy separate worker
> > > clusters
> > > > > based on the workload patterns that you are seeing and machine
> > > resources
> > > > > available.
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> > > @rmoff
> > > > >
> > > > >
> > > > > On Wed, 20 May 2020 at 16:39, Deepak Raghav <
> > deepakraghav86@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Robin
> > > > > >
> > > > > > Replying to your query i.e
> > > > > >
> > > > > > One thing I'd ask at this point is though if it makes any
> > difference
> > > > > where
> > > > > > the tasks execute?
> > > > > >
> > > > > > It actually makes difference to us, we have 16 connectors and as
> I
> > > > stated
> > > > > > tasks division earlier, first 8 connector' task are assigned to
> > first
> > > > > > worker process and another connector's task to another worker
> > process
> > > > and
> > > > > > just to mention that these 16 connectors are sink connectors.
> Each
> > > sink
> > > > > > connector consumes message from different topic.There may be a
> case
> > > > when
> > > > > > messages are coming only for first 8 connector's topic and
> because
> > > all
> > > > > the
> > > > > > tasks of these connectors are assigned to First Worker, load
> would
> > be
> > > > > high
> > > > > > on it and another set of connectors in another worker would be
> > idle.
> > > > > >
> > > > > > Instead, if the task would have been divided evenly then this
> case
> > > > would
> > > > > > have been avoided. Because tasks of each connector would be
> present
> > > in
> > > > > both
> > > > > > workers process like below :
> > > > > >
> > > > > > *W1*                       *W2*
> > > > > >  C1T1                    C1T2
> > > > > >  C2T2                    C2T2
> > > > > >
> > > > > > I hope, I gave your answer,
> > > > > >
> > > > > >
> > > > > > Regards and Thanks
> > > > > > Deepak Raghav
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <
> robin@confluent.io>
> > > > > wrote:
> > > > > >
> > > > > > > OK, I understand better now.
> > > > > > >
> > > > > > > You can read more about the guts of the rebalancing protocol
> that
> > > > Kafka
> > > > > > > Connect uses as of Apache Kafka 2.3 an onwards here:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> > > > > > >
> > > > > > > One thing I'd ask at this point is though if it makes any
> > > difference
> > > > > > where
> > > > > > > the tasks execute? The point of a cluster is that Kafka Connect
> > > > manages
> > > > > > the
> > > > > > > workload allocation. If you need workload separation and
> > > > > > > guaranteed execution locality I would suggest separate Kafka
> > > Connect
> > > > > > > distributed clusters.
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io
> |
> > > > > @rmoff
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
> > > > deepakraghav86@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Robin
> > > > > > > >
> > > > > > > > Thanks for your reply.
> > > > > > > >
> > > > > > > > We are having two worker on different IP. The example which I
> > > gave
> > > > > you
> > > > > > it
> > > > > > > > was just a example. We are using kafka version 2.3.1.
> > > > > > > >
> > > > > > > > Let me tell you again with a simple example.
> > > > > > > >
> > > > > > > > Suppose, we have two EC2 node, N1 and N2 having worker
> process
> > W1
> > > > and
> > > > > > W2
> > > > > > > > running in distribute mode with groupId i.e in same cluster
> and
> > > two
> > > > > > > > connectors with having two task each i.e
> > > > > > > >
> > > > > > > > Node N1: W1 is running
> > > > > > > > Node N2 : W2 is running
> > > > > > > >
> > > > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with
> id
> > :
> > > > C1T2
> > > > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with
> > id :
> > > > > C2T2
> > > > > > > >
> > > > > > > > Now Suppose If both W1 and W2 worker process are running
> and I
> > > > > > register
> > > > > > > > Connector C1 and C2 one after another i.e sequentially, on
> any
> > of
> > > > the
> > > > > > > > worker process, the tasks division between the worker
> > > > > > > > node are happening like below, which is expected.
> > > > > > > >
> > > > > > > > *W1*                       *W2*
> > > > > > > > C1T1                    C1T2
> > > > > > > > C2T2                    C2T2
> > > > > > > >
> > > > > > > > Now, suppose I stop one worker process e.g W2 and start after
> > > some
> > > > > > time,
> > > > > > > > the tasks division is changed like below i.e first
> connector's
> > > task
> > > > > > move
> > > > > > > to
> > > > > > > > W1 and second connector's task move to W2
> > > > > > > >
> > > > > > > > *W1*                       *W2*
> > > > > > > > C1T1                    C2T1
> > > > > > > > C1T2                    C2T2
> > > > > > > >
> > > > > > > >
> > > > > > > > Please let me know, If it is understandable or not.
> > > > > > > >
> > > > > > > > Note : Actually, In production, we are gonna have 16
> connectors
> > > > > having
> > > > > > 10
> > > > > > > > task each and two worker node. With above scenario, first 8
> > > > > > connectors's
> > > > > > > > task move to W1 and next 8 connector' task move to W2, Which
> is
> > > not
> > > > > > > > expected.
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards and Thanks
> > > > > > > > Deepak Raghav
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <
> > > robin@confluent.io>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > So you're running two workers on the same machine
> (10.0.0.4),
> > > is
> > > > > > > > > that correct? Normally you'd run one worker per machine
> > unless
> > > > > there
> > > > > > > was
> > > > > > > > a
> > > > > > > > > particular reason otherwise.
> > > > > > > > > What version of Apache Kafka are you using?
> > > > > > > > > I'm not clear from your question if the distribution of
> tasks
> > > is
> > > > > > > > > presenting a problem to you (if so please describe why), or
> > if
> > > > > you're
> > > > > > > > just
> > > > > > > > > interested in the theory behind the rebalancing protocol?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Robin Moffatt | Senior Developer Advocate |
> > robin@confluent.io
> > > |
> > > > > > > @rmoff
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> > > > > > deepakraghav86@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi
> > > > > > > > > >
> > > > > > > > > > Please, can anybody help me with this?
> > > > > > > > > >
> > > > > > > > > > Regards and Thanks
> > > > > > > > > > Deepak Raghav
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > > > > > > deepakraghav86@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Team
> > > > > > > > > > >
> > > > > > > > > > > We have two worker node in a cluster and 2 connector
> with
> > > > > having
> > > > > > 10
> > > > > > > > > tasks
> > > > > > > > > > > each.
> > > > > > > > > > >
> > > > > > > > > > > Now, suppose if we have two kafka connect process
> W1(Port
> > > > 8080)
> > > > > > and
> > > > > > > > > > > W2(Port 8078) started already in distribute mode and
> then
> > > > > > register
> > > > > > > > the
> > > > > > > > > > > connectors, task of one connector i.e 10 tasks are
> > divided
> > > > > > equally
> > > > > > > > > > between
> > > > > > > > > > > two worker i.e first task of A connector to W1 worker
> > node
> > > > and
> > > > > > sec
> > > > > > > > task
> > > > > > > > > > of
> > > > > > > > > > > A connector to W2 worker node, similarly for first task
> > of
> > > B
> > > > > > > > connector,
> > > > > > > > > > > will go to W1 node and sec task of B connector go to W2
> > > node.
> > > > > > > > > > >
> > > > > > > > > > > e.g
> > > > > > > > > > > *#First Connector : *
> > > > > > > > > > > {
> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > > > > >   "connector": {
> > > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > > > > > > >   },
> > > > > > > > > > >   "tasks": [
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 0,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 1,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 2,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 3,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 4,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 5,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 6,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 7,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 8,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 9,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     }
> > > > > > > > > > >   ],
> > > > > > > > > > >   "type": "sink"
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > *#Sec connector*
> > > > > > > > > > >
> > > > > > > > > > > {
> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > > > > >   "connector": {
> > > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >   },
> > > > > > > > > > >   "tasks": [
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 0,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 1,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 2,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 3,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 4,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 5,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 6,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 7,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 8,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 9,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     }
> > > > > > > > > > >   ],
> > > > > > > > > > >   "type": "sink"
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > > But I have seen a strange behavior, when I just
> shutdown
> > W2
> > > > > > worker
> > > > > > > > node
> > > > > > > > > > > and start it again task are divided but in diff way i.e
> > all
> > > > the
> > > > > > > tasks
> > > > > > > > > of
> > > > > > > > > > A
> > > > > > > > > > > connector will get into W1 node and tasks of B
> Connector
> > > into
> > > > > W2
> > > > > > > > node.
> > > > > > > > > > >
> > > > > > > > > > > Can you please have a look for this.
> > > > > > > > > > >
> > > > > > > > > > > *#First Connector*
> > > > > > > > > > >
> > > > > > > > > > > {
> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > > > > >   "connector": {
> > > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >   },
> > > > > > > > > > >   "tasks": [
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 0,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 1,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 2,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 3,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 4,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 5,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 6,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 7,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 8,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 9,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > > >     }
> > > > > > > > > > >   ],
> > > > > > > > > > >   "type": "sink"
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > > *#Second Connector *:
> > > > > > > > > > >
> > > > > > > > > > > {
> > > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > > > > >   "connector": {
> > > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >   },
> > > > > > > > > > >   "tasks": [
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 0,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 1,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 2,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 3,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 4,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 5,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 6,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 7,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 8,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     },
> > > > > > > > > > >     {
> > > > > > > > > > >       "id": 9,
> > > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > > >     }
> > > > > > > > > > >   ],
> > > > > > > > > > >   "type": "sink"
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Regards and Thanks
> > > > > > > > > > > Deepak Raghav
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Robin Moffatt <ro...@confluent.io>.
The KIP for the current rebalancing protocol is probably a good reference:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-415:+Incremental+Cooperative+Rebalancing+in+Kafka+Connect


-- 

Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff


On Tue, 26 May 2020 at 14:25, Deepak Raghav <de...@gmail.com>
wrote:

> Hi Robin
>
> Thanks for the clarification.
>
> As you suggested, that task allocation between the workers is
> nondeterministic. I have shared the same information within in my team but
> there are some other parties, with whom I need to share this information as
> explanation for the issue raised by them and I cannot show this mail as a
> reference.
>
> It would be very great if you please share any link/discussion reference
> regarding the same.
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io> wrote:
>
> > I don't think you're right to assert that this is "expected behaviour":
> >
> > >  the tasks are divided in below pattern when they are first time
> > registered
> >
> > Kafka Connect task allocation is non-determanistic.
> >
> > I'm still not clear if you're solving for a theoretical problem or an
> > actual one. If this is an actual problem that you're encountering and
> need
> > a solution to then since the task allocation is not deterministic it
> sounds
> > like you need to deploy separate worker clusters based on the workload
> > patterns that you are seeing and machine resources available.
> >
> >
> > --
> >
> > Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
> >
> >
> > On Wed, 20 May 2020 at 21:29, Deepak Raghav <de...@gmail.com>
> > wrote:
> >
> > > Hi Robin
> > >
> > > I had gone though the link you provided, It is not helpful in my case.
> > > Apart from this, *I am not getting why the tasks are divided in *below
> > > pattern* when they are *first time registered*, which is expected
> > behavior.
> > > I*s there any parameter which we can pass in worker property file which
> > > handle the task assignment strategy like we have range assigner or
> round
> > > robin in consumer-group ?
> > >
> > > connector rest status api result after first registration :
> > >
> > > {
> > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > >   "connector": {
> > >     "state": "RUNNING",
> > >     "worker_id": "10.0.0.5:*8080*"
> > >   },
> > >   "tasks": [
> > >     {
> > >       "id": 0,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:*8078*"
> > >     },
> > >     {
> > >       "id": 1,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.5:*8080*"
> > >     }
> > >   ],
> > >   "type": "sink"
> > > }
> > >
> > > and
> > >
> > > {
> > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > >   "connector": {
> > >     "state": "RUNNING",
> > >     "worker_id": "10.0.0.4:*8078*"
> > >   },
> > >   "tasks": [
> > >     {
> > >       "id": 0,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:*8078*"
> > >     },
> > >     {
> > >       "id": 1,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.5:*8080*"
> > >     }
> > >   ],
> > >   "type": "sink"
> > > }
> > >
> > >
> > > But when I stop the second worker process and wait for
> > > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start the
> > > process again. Result is different.
> > >
> > > {
> > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > >   "connector": {
> > >     "state": "RUNNING",
> > >     "worker_id": "10.0.0.5:*8080*"
> > >   },
> > >   "tasks": [
> > >     {
> > >       "id": 0,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.5:*8080*"
> > >     },
> > >     {
> > >       "id": 1,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.5:*8080*"
> > >     }
> > >   ],
> > >   "type": "sink"
> > > }
> > >
> > > and
> > >
> > > {
> > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > >   "connector": {
> > >     "state": "RUNNING",
> > >     "worker_id": "10.0.0.4:*8078*"
> > >   },
> > >   "tasks": [
> > >     {
> > >       "id": 0,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:*8078*"
> > >     },
> > >     {
> > >       "id": 1,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:*8078*"
> > >     }
> > >   ],
> > >   "type": "sink"
> > > }
> > >
> > > Regards and Thanks
> > > Deepak Raghav
> > >
> > >
> > >
> > > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io>
> > wrote:
> > >
> > > > Thanks for the clarification. If this is an actual problem that
> you're
> > > > encountering and need a solution to then since the task allocation is
> > not
> > > > deterministic it sounds like you need to deploy separate worker
> > clusters
> > > > based on the workload patterns that you are seeing and machine
> > resources
> > > > available.
> > > >
> > > >
> > > > --
> > > >
> > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> > @rmoff
> > > >
> > > >
> > > > On Wed, 20 May 2020 at 16:39, Deepak Raghav <
> deepakraghav86@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Robin
> > > > >
> > > > > Replying to your query i.e
> > > > >
> > > > > One thing I'd ask at this point is though if it makes any
> difference
> > > > where
> > > > > the tasks execute?
> > > > >
> > > > > It actually makes difference to us, we have 16 connectors and as I
> > > stated
> > > > > tasks division earlier, first 8 connector' task are assigned to
> first
> > > > > worker process and another connector's task to another worker
> process
> > > and
> > > > > just to mention that these 16 connectors are sink connectors. Each
> > sink
> > > > > connector consumes message from different topic.There may be a case
> > > when
> > > > > messages are coming only for first 8 connector's topic and because
> > all
> > > > the
> > > > > tasks of these connectors are assigned to First Worker, load would
> be
> > > > high
> > > > > on it and another set of connectors in another worker would be
> idle.
> > > > >
> > > > > Instead, if the task would have been divided evenly then this case
> > > would
> > > > > have been avoided. Because tasks of each connector would be present
> > in
> > > > both
> > > > > workers process like below :
> > > > >
> > > > > *W1*                       *W2*
> > > > >  C1T1                    C1T2
> > > > >  C2T2                    C2T2
> > > > >
> > > > > I hope, I gave your answer,
> > > > >
> > > > >
> > > > > Regards and Thanks
> > > > > Deepak Raghav
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > OK, I understand better now.
> > > > > >
> > > > > > You can read more about the guts of the rebalancing protocol that
> > > Kafka
> > > > > > Connect uses as of Apache Kafka 2.3 an onwards here:
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> > > > > >
> > > > > > One thing I'd ask at this point is though if it makes any
> > difference
> > > > > where
> > > > > > the tasks execute? The point of a cluster is that Kafka Connect
> > > manages
> > > > > the
> > > > > > workload allocation. If you need workload separation and
> > > > > > guaranteed execution locality I would suggest separate Kafka
> > Connect
> > > > > > distributed clusters.
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> > > > @rmoff
> > > > > >
> > > > > >
> > > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
> > > deepakraghav86@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Robin
> > > > > > >
> > > > > > > Thanks for your reply.
> > > > > > >
> > > > > > > We are having two worker on different IP. The example which I
> > gave
> > > > you
> > > > > it
> > > > > > > was just a example. We are using kafka version 2.3.1.
> > > > > > >
> > > > > > > Let me tell you again with a simple example.
> > > > > > >
> > > > > > > Suppose, we have two EC2 node, N1 and N2 having worker process
> W1
> > > and
> > > > > W2
> > > > > > > running in distribute mode with groupId i.e in same cluster and
> > two
> > > > > > > connectors with having two task each i.e
> > > > > > >
> > > > > > > Node N1: W1 is running
> > > > > > > Node N2 : W2 is running
> > > > > > >
> > > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id
> :
> > > C1T2
> > > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with
> id :
> > > > C2T2
> > > > > > >
> > > > > > > Now Suppose If both W1 and W2 worker process are running  and I
> > > > > register
> > > > > > > Connector C1 and C2 one after another i.e sequentially, on any
> of
> > > the
> > > > > > > worker process, the tasks division between the worker
> > > > > > > node are happening like below, which is expected.
> > > > > > >
> > > > > > > *W1*                       *W2*
> > > > > > > C1T1                    C1T2
> > > > > > > C2T2                    C2T2
> > > > > > >
> > > > > > > Now, suppose I stop one worker process e.g W2 and start after
> > some
> > > > > time,
> > > > > > > the tasks division is changed like below i.e first connector's
> > task
> > > > > move
> > > > > > to
> > > > > > > W1 and second connector's task move to W2
> > > > > > >
> > > > > > > *W1*                       *W2*
> > > > > > > C1T1                    C2T1
> > > > > > > C1T2                    C2T2
> > > > > > >
> > > > > > >
> > > > > > > Please let me know, If it is understandable or not.
> > > > > > >
> > > > > > > Note : Actually, In production, we are gonna have 16 connectors
> > > > having
> > > > > 10
> > > > > > > task each and two worker node. With above scenario, first 8
> > > > > connectors's
> > > > > > > task move to W1 and next 8 connector' task move to W2, Which is
> > not
> > > > > > > expected.
> > > > > > >
> > > > > > >
> > > > > > > Regards and Thanks
> > > > > > > Deepak Raghav
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <
> > robin@confluent.io>
> > > > > > wrote:
> > > > > > >
> > > > > > > > So you're running two workers on the same machine (10.0.0.4),
> > is
> > > > > > > > that correct? Normally you'd run one worker per machine
> unless
> > > > there
> > > > > > was
> > > > > > > a
> > > > > > > > particular reason otherwise.
> > > > > > > > What version of Apache Kafka are you using?
> > > > > > > > I'm not clear from your question if the distribution of tasks
> > is
> > > > > > > > presenting a problem to you (if so please describe why), or
> if
> > > > you're
> > > > > > > just
> > > > > > > > interested in the theory behind the rebalancing protocol?
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > >
> > > > > > > > Robin Moffatt | Senior Developer Advocate |
> robin@confluent.io
> > |
> > > > > > @rmoff
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> > > > > deepakraghav86@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi
> > > > > > > > >
> > > > > > > > > Please, can anybody help me with this?
> > > > > > > > >
> > > > > > > > > Regards and Thanks
> > > > > > > > > Deepak Raghav
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > > > > > deepakraghav86@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Team
> > > > > > > > > >
> > > > > > > > > > We have two worker node in a cluster and 2 connector with
> > > > having
> > > > > 10
> > > > > > > > tasks
> > > > > > > > > > each.
> > > > > > > > > >
> > > > > > > > > > Now, suppose if we have two kafka connect process W1(Port
> > > 8080)
> > > > > and
> > > > > > > > > > W2(Port 8078) started already in distribute mode and then
> > > > > register
> > > > > > > the
> > > > > > > > > > connectors, task of one connector i.e 10 tasks are
> divided
> > > > > equally
> > > > > > > > > between
> > > > > > > > > > two worker i.e first task of A connector to W1 worker
> node
> > > and
> > > > > sec
> > > > > > > task
> > > > > > > > > of
> > > > > > > > > > A connector to W2 worker node, similarly for first task
> of
> > B
> > > > > > > connector,
> > > > > > > > > > will go to W1 node and sec task of B connector go to W2
> > node.
> > > > > > > > > >
> > > > > > > > > > e.g
> > > > > > > > > > *#First Connector : *
> > > > > > > > > > {
> > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > > > >   "connector": {
> > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > > > > > >   },
> > > > > > > > > >   "tasks": [
> > > > > > > > > >     {
> > > > > > > > > >       "id": 0,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 1,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 2,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 3,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 4,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 5,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 6,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 7,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 8,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 9,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     }
> > > > > > > > > >   ],
> > > > > > > > > >   "type": "sink"
> > > > > > > > > > }
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > *#Sec connector*
> > > > > > > > > >
> > > > > > > > > > {
> > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > > > >   "connector": {
> > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > > > >   },
> > > > > > > > > >   "tasks": [
> > > > > > > > > >     {
> > > > > > > > > >       "id": 0,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 1,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 2,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 3,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 4,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 5,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 6,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 7,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 8,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 9,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     }
> > > > > > > > > >   ],
> > > > > > > > > >   "type": "sink"
> > > > > > > > > > }
> > > > > > > > > >
> > > > > > > > > > But I have seen a strange behavior, when I just shutdown
> W2
> > > > > worker
> > > > > > > node
> > > > > > > > > > and start it again task are divided but in diff way i.e
> all
> > > the
> > > > > > tasks
> > > > > > > > of
> > > > > > > > > A
> > > > > > > > > > connector will get into W1 node and tasks of B Connector
> > into
> > > > W2
> > > > > > > node.
> > > > > > > > > >
> > > > > > > > > > Can you please have a look for this.
> > > > > > > > > >
> > > > > > > > > > *#First Connector*
> > > > > > > > > >
> > > > > > > > > > {
> > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > > > >   "connector": {
> > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > > > > > >   },
> > > > > > > > > >   "tasks": [
> > > > > > > > > >     {
> > > > > > > > > >       "id": 0,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 1,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 2,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 3,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 4,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 5,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 6,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 7,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 8,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 9,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > > >     }
> > > > > > > > > >   ],
> > > > > > > > > >   "type": "sink"
> > > > > > > > > > }
> > > > > > > > > >
> > > > > > > > > > *#Second Connector *:
> > > > > > > > > >
> > > > > > > > > > {
> > > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > > > >   "connector": {
> > > > > > > > > >     "state": "RUNNING",
> > > > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > > > >   },
> > > > > > > > > >   "tasks": [
> > > > > > > > > >     {
> > > > > > > > > >       "id": 0,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 1,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 2,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 3,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 4,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 5,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 6,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 7,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 8,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     },
> > > > > > > > > >     {
> > > > > > > > > >       "id": 9,
> > > > > > > > > >       "state": "RUNNING",
> > > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > > >     }
> > > > > > > > > >   ],
> > > > > > > > > >   "type": "sink"
> > > > > > > > > > }
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Regards and Thanks
> > > > > > > > > > Deepak Raghav
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Deepak Raghav <de...@gmail.com>.
Hi Robin

Thanks for the clarification.

As you suggested, that task allocation between the workers is
nondeterministic. I have shared the same information within in my team but
there are some other parties, with whom I need to share this information as
explanation for the issue raised by them and I cannot show this mail as a
reference.

It would be very great if you please share any link/discussion reference
regarding the same.

Regards and Thanks
Deepak Raghav



On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io> wrote:

> I don't think you're right to assert that this is "expected behaviour":
>
> >  the tasks are divided in below pattern when they are first time
> registered
>
> Kafka Connect task allocation is non-determanistic.
>
> I'm still not clear if you're solving for a theoretical problem or an
> actual one. If this is an actual problem that you're encountering and need
> a solution to then since the task allocation is not deterministic it sounds
> like you need to deploy separate worker clusters based on the workload
> patterns that you are seeing and machine resources available.
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
>
>
> On Wed, 20 May 2020 at 21:29, Deepak Raghav <de...@gmail.com>
> wrote:
>
> > Hi Robin
> >
> > I had gone though the link you provided, It is not helpful in my case.
> > Apart from this, *I am not getting why the tasks are divided in *below
> > pattern* when they are *first time registered*, which is expected
> behavior.
> > I*s there any parameter which we can pass in worker property file which
> > handle the task assignment strategy like we have range assigner or round
> > robin in consumer-group ?
> >
> > connector rest status api result after first registration :
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.5:*8080*"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:*8078*"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.5:*8080*"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> > and
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.4:*8078*"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:*8078*"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.5:*8080*"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> >
> > But when I stop the second worker process and wait for
> > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start the
> > process again. Result is different.
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.5:*8080*"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.5:*8080*"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.5:*8080*"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> > and
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.4:*8078*"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:*8078*"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:*8078*"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> > Regards and Thanks
> > Deepak Raghav
> >
> >
> >
> > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io>
> wrote:
> >
> > > Thanks for the clarification. If this is an actual problem that you're
> > > encountering and need a solution to then since the task allocation is
> not
> > > deterministic it sounds like you need to deploy separate worker
> clusters
> > > based on the workload patterns that you are seeing and machine
> resources
> > > available.
> > >
> > >
> > > --
> > >
> > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> @rmoff
> > >
> > >
> > > On Wed, 20 May 2020 at 16:39, Deepak Raghav <de...@gmail.com>
> > > wrote:
> > >
> > > > Hi Robin
> > > >
> > > > Replying to your query i.e
> > > >
> > > > One thing I'd ask at this point is though if it makes any difference
> > > where
> > > > the tasks execute?
> > > >
> > > > It actually makes difference to us, we have 16 connectors and as I
> > stated
> > > > tasks division earlier, first 8 connector' task are assigned to first
> > > > worker process and another connector's task to another worker process
> > and
> > > > just to mention that these 16 connectors are sink connectors. Each
> sink
> > > > connector consumes message from different topic.There may be a case
> > when
> > > > messages are coming only for first 8 connector's topic and because
> all
> > > the
> > > > tasks of these connectors are assigned to First Worker, load would be
> > > high
> > > > on it and another set of connectors in another worker would be idle.
> > > >
> > > > Instead, if the task would have been divided evenly then this case
> > would
> > > > have been avoided. Because tasks of each connector would be present
> in
> > > both
> > > > workers process like below :
> > > >
> > > > *W1*                       *W2*
> > > >  C1T1                    C1T2
> > > >  C2T2                    C2T2
> > > >
> > > > I hope, I gave your answer,
> > > >
> > > >
> > > > Regards and Thanks
> > > > Deepak Raghav
> > > >
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io>
> > > wrote:
> > > >
> > > > > OK, I understand better now.
> > > > >
> > > > > You can read more about the guts of the rebalancing protocol that
> > Kafka
> > > > > Connect uses as of Apache Kafka 2.3 an onwards here:
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> > > > >
> > > > > One thing I'd ask at this point is though if it makes any
> difference
> > > > where
> > > > > the tasks execute? The point of a cluster is that Kafka Connect
> > manages
> > > > the
> > > > > workload allocation. If you need workload separation and
> > > > > guaranteed execution locality I would suggest separate Kafka
> Connect
> > > > > distributed clusters.
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> > > @rmoff
> > > > >
> > > > >
> > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
> > deepakraghav86@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Robin
> > > > > >
> > > > > > Thanks for your reply.
> > > > > >
> > > > > > We are having two worker on different IP. The example which I
> gave
> > > you
> > > > it
> > > > > > was just a example. We are using kafka version 2.3.1.
> > > > > >
> > > > > > Let me tell you again with a simple example.
> > > > > >
> > > > > > Suppose, we have two EC2 node, N1 and N2 having worker process W1
> > and
> > > > W2
> > > > > > running in distribute mode with groupId i.e in same cluster and
> two
> > > > > > connectors with having two task each i.e
> > > > > >
> > > > > > Node N1: W1 is running
> > > > > > Node N2 : W2 is running
> > > > > >
> > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id :
> > C1T2
> > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with id :
> > > C2T2
> > > > > >
> > > > > > Now Suppose If both W1 and W2 worker process are running  and I
> > > > register
> > > > > > Connector C1 and C2 one after another i.e sequentially, on any of
> > the
> > > > > > worker process, the tasks division between the worker
> > > > > > node are happening like below, which is expected.
> > > > > >
> > > > > > *W1*                       *W2*
> > > > > > C1T1                    C1T2
> > > > > > C2T2                    C2T2
> > > > > >
> > > > > > Now, suppose I stop one worker process e.g W2 and start after
> some
> > > > time,
> > > > > > the tasks division is changed like below i.e first connector's
> task
> > > > move
> > > > > to
> > > > > > W1 and second connector's task move to W2
> > > > > >
> > > > > > *W1*                       *W2*
> > > > > > C1T1                    C2T1
> > > > > > C1T2                    C2T2
> > > > > >
> > > > > >
> > > > > > Please let me know, If it is understandable or not.
> > > > > >
> > > > > > Note : Actually, In production, we are gonna have 16 connectors
> > > having
> > > > 10
> > > > > > task each and two worker node. With above scenario, first 8
> > > > connectors's
> > > > > > task move to W1 and next 8 connector' task move to W2, Which is
> not
> > > > > > expected.
> > > > > >
> > > > > >
> > > > > > Regards and Thanks
> > > > > > Deepak Raghav
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <
> robin@confluent.io>
> > > > > wrote:
> > > > > >
> > > > > > > So you're running two workers on the same machine (10.0.0.4),
> is
> > > > > > > that correct? Normally you'd run one worker per machine unless
> > > there
> > > > > was
> > > > > > a
> > > > > > > particular reason otherwise.
> > > > > > > What version of Apache Kafka are you using?
> > > > > > > I'm not clear from your question if the distribution of tasks
> is
> > > > > > > presenting a problem to you (if so please describe why), or if
> > > you're
> > > > > > just
> > > > > > > interested in the theory behind the rebalancing protocol?
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io
> |
> > > > > @rmoff
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> > > > deepakraghav86@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi
> > > > > > > >
> > > > > > > > Please, can anybody help me with this?
> > > > > > > >
> > > > > > > > Regards and Thanks
> > > > > > > > Deepak Raghav
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > > > > deepakraghav86@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Team
> > > > > > > > >
> > > > > > > > > We have two worker node in a cluster and 2 connector with
> > > having
> > > > 10
> > > > > > > tasks
> > > > > > > > > each.
> > > > > > > > >
> > > > > > > > > Now, suppose if we have two kafka connect process W1(Port
> > 8080)
> > > > and
> > > > > > > > > W2(Port 8078) started already in distribute mode and then
> > > > register
> > > > > > the
> > > > > > > > > connectors, task of one connector i.e 10 tasks are divided
> > > > equally
> > > > > > > > between
> > > > > > > > > two worker i.e first task of A connector to W1 worker node
> > and
> > > > sec
> > > > > > task
> > > > > > > > of
> > > > > > > > > A connector to W2 worker node, similarly for first task of
> B
> > > > > > connector,
> > > > > > > > > will go to W1 node and sec task of B connector go to W2
> node.
> > > > > > > > >
> > > > > > > > > e.g
> > > > > > > > > *#First Connector : *
> > > > > > > > > {
> > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > > >   "connector": {
> > > > > > > > >     "state": "RUNNING",
> > > > > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > > > > >   },
> > > > > > > > >   "tasks": [
> > > > > > > > >     {
> > > > > > > > >       "id": 0,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 1,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 2,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 3,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 4,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 5,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 6,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 7,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 8,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 9,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     }
> > > > > > > > >   ],
> > > > > > > > >   "type": "sink"
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > *#Sec connector*
> > > > > > > > >
> > > > > > > > > {
> > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > > >   "connector": {
> > > > > > > > >     "state": "RUNNING",
> > > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > > >   },
> > > > > > > > >   "tasks": [
> > > > > > > > >     {
> > > > > > > > >       "id": 0,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 1,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 2,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 3,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 4,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 5,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 6,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 7,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 8,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 9,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     }
> > > > > > > > >   ],
> > > > > > > > >   "type": "sink"
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > But I have seen a strange behavior, when I just shutdown W2
> > > > worker
> > > > > > node
> > > > > > > > > and start it again task are divided but in diff way i.e all
> > the
> > > > > tasks
> > > > > > > of
> > > > > > > > A
> > > > > > > > > connector will get into W1 node and tasks of B Connector
> into
> > > W2
> > > > > > node.
> > > > > > > > >
> > > > > > > > > Can you please have a look for this.
> > > > > > > > >
> > > > > > > > > *#First Connector*
> > > > > > > > >
> > > > > > > > > {
> > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > > >   "connector": {
> > > > > > > > >     "state": "RUNNING",
> > > > > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > > > > >   },
> > > > > > > > >   "tasks": [
> > > > > > > > >     {
> > > > > > > > >       "id": 0,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 1,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 2,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 3,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 4,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 5,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 6,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 7,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 8,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 9,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     }
> > > > > > > > >   ],
> > > > > > > > >   "type": "sink"
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > *#Second Connector *:
> > > > > > > > >
> > > > > > > > > {
> > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > > >   "connector": {
> > > > > > > > >     "state": "RUNNING",
> > > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > > >   },
> > > > > > > > >   "tasks": [
> > > > > > > > >     {
> > > > > > > > >       "id": 0,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 1,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 2,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 3,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 4,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 5,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 6,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 7,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 8,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 9,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     }
> > > > > > > > >   ],
> > > > > > > > >   "type": "sink"
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regards and Thanks
> > > > > > > > > Deepak Raghav
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Robin Moffatt <ro...@confluent.io>.
I don't think you're right to assert that this is "expected behaviour":

>  the tasks are divided in below pattern when they are first time
registered

Kafka Connect task allocation is non-determanistic.

I'm still not clear if you're solving for a theoretical problem or an
actual one. If this is an actual problem that you're encountering and need
a solution to then since the task allocation is not deterministic it sounds
like you need to deploy separate worker clusters based on the workload
patterns that you are seeing and machine resources available.


-- 

Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff


On Wed, 20 May 2020 at 21:29, Deepak Raghav <de...@gmail.com>
wrote:

> Hi Robin
>
> I had gone though the link you provided, It is not helpful in my case.
> Apart from this, *I am not getting why the tasks are divided in *below
> pattern* when they are *first time registered*, which is expected behavior.
> I*s there any parameter which we can pass in worker property file which
> handle the task assignment strategy like we have range assigner or round
> robin in consumer-group ?
>
> connector rest status api result after first registration :
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.5:*8080*"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:*8078*"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.5:*8080*"
>     }
>   ],
>   "type": "sink"
> }
>
> and
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.4:*8078*"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:*8078*"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.5:*8080*"
>     }
>   ],
>   "type": "sink"
> }
>
>
> But when I stop the second worker process and wait for
> scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start the
> process again. Result is different.
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.5:*8080*"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.5:*8080*"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.5:*8080*"
>     }
>   ],
>   "type": "sink"
> }
>
> and
>
> {
>   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
>   "connector": {
>     "state": "RUNNING",
>     "worker_id": "10.0.0.4:*8078*"
>   },
>   "tasks": [
>     {
>       "id": 0,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:*8078*"
>     },
>     {
>       "id": 1,
>       "state": "RUNNING",
>       "worker_id": "10.0.0.4:*8078*"
>     }
>   ],
>   "type": "sink"
> }
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io> wrote:
>
> > Thanks for the clarification. If this is an actual problem that you're
> > encountering and need a solution to then since the task allocation is not
> > deterministic it sounds like you need to deploy separate worker clusters
> > based on the workload patterns that you are seeing and machine resources
> > available.
> >
> >
> > --
> >
> > Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
> >
> >
> > On Wed, 20 May 2020 at 16:39, Deepak Raghav <de...@gmail.com>
> > wrote:
> >
> > > Hi Robin
> > >
> > > Replying to your query i.e
> > >
> > > One thing I'd ask at this point is though if it makes any difference
> > where
> > > the tasks execute?
> > >
> > > It actually makes difference to us, we have 16 connectors and as I
> stated
> > > tasks division earlier, first 8 connector' task are assigned to first
> > > worker process and another connector's task to another worker process
> and
> > > just to mention that these 16 connectors are sink connectors. Each sink
> > > connector consumes message from different topic.There may be a case
> when
> > > messages are coming only for first 8 connector's topic and because all
> > the
> > > tasks of these connectors are assigned to First Worker, load would be
> > high
> > > on it and another set of connectors in another worker would be idle.
> > >
> > > Instead, if the task would have been divided evenly then this case
> would
> > > have been avoided. Because tasks of each connector would be present in
> > both
> > > workers process like below :
> > >
> > > *W1*                       *W2*
> > >  C1T1                    C1T2
> > >  C2T2                    C2T2
> > >
> > > I hope, I gave your answer,
> > >
> > >
> > > Regards and Thanks
> > > Deepak Raghav
> > >
> > >
> > >
> > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io>
> > wrote:
> > >
> > > > OK, I understand better now.
> > > >
> > > > You can read more about the guts of the rebalancing protocol that
> Kafka
> > > > Connect uses as of Apache Kafka 2.3 an onwards here:
> > > >
> > >
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> > > >
> > > > One thing I'd ask at this point is though if it makes any difference
> > > where
> > > > the tasks execute? The point of a cluster is that Kafka Connect
> manages
> > > the
> > > > workload allocation. If you need workload separation and
> > > > guaranteed execution locality I would suggest separate Kafka Connect
> > > > distributed clusters.
> > > >
> > > >
> > > > --
> > > >
> > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> > @rmoff
> > > >
> > > >
> > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
> deepakraghav86@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Robin
> > > > >
> > > > > Thanks for your reply.
> > > > >
> > > > > We are having two worker on different IP. The example which I gave
> > you
> > > it
> > > > > was just a example. We are using kafka version 2.3.1.
> > > > >
> > > > > Let me tell you again with a simple example.
> > > > >
> > > > > Suppose, we have two EC2 node, N1 and N2 having worker process W1
> and
> > > W2
> > > > > running in distribute mode with groupId i.e in same cluster and two
> > > > > connectors with having two task each i.e
> > > > >
> > > > > Node N1: W1 is running
> > > > > Node N2 : W2 is running
> > > > >
> > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id :
> C1T2
> > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with id :
> > C2T2
> > > > >
> > > > > Now Suppose If both W1 and W2 worker process are running  and I
> > > register
> > > > > Connector C1 and C2 one after another i.e sequentially, on any of
> the
> > > > > worker process, the tasks division between the worker
> > > > > node are happening like below, which is expected.
> > > > >
> > > > > *W1*                       *W2*
> > > > > C1T1                    C1T2
> > > > > C2T2                    C2T2
> > > > >
> > > > > Now, suppose I stop one worker process e.g W2 and start after some
> > > time,
> > > > > the tasks division is changed like below i.e first connector's task
> > > move
> > > > to
> > > > > W1 and second connector's task move to W2
> > > > >
> > > > > *W1*                       *W2*
> > > > > C1T1                    C2T1
> > > > > C1T2                    C2T2
> > > > >
> > > > >
> > > > > Please let me know, If it is understandable or not.
> > > > >
> > > > > Note : Actually, In production, we are gonna have 16 connectors
> > having
> > > 10
> > > > > task each and two worker node. With above scenario, first 8
> > > connectors's
> > > > > task move to W1 and next 8 connector' task move to W2, Which is not
> > > > > expected.
> > > > >
> > > > >
> > > > > Regards and Thanks
> > > > > Deepak Raghav
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <ro...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > So you're running two workers on the same machine (10.0.0.4), is
> > > > > > that correct? Normally you'd run one worker per machine unless
> > there
> > > > was
> > > > > a
> > > > > > particular reason otherwise.
> > > > > > What version of Apache Kafka are you using?
> > > > > > I'm not clear from your question if the distribution of tasks is
> > > > > > presenting a problem to you (if so please describe why), or if
> > you're
> > > > > just
> > > > > > interested in the theory behind the rebalancing protocol?
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> > > > @rmoff
> > > > > >
> > > > > >
> > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> > > deepakraghav86@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi
> > > > > > >
> > > > > > > Please, can anybody help me with this?
> > > > > > >
> > > > > > > Regards and Thanks
> > > > > > > Deepak Raghav
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > > > deepakraghav86@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Team
> > > > > > > >
> > > > > > > > We have two worker node in a cluster and 2 connector with
> > having
> > > 10
> > > > > > tasks
> > > > > > > > each.
> > > > > > > >
> > > > > > > > Now, suppose if we have two kafka connect process W1(Port
> 8080)
> > > and
> > > > > > > > W2(Port 8078) started already in distribute mode and then
> > > register
> > > > > the
> > > > > > > > connectors, task of one connector i.e 10 tasks are divided
> > > equally
> > > > > > > between
> > > > > > > > two worker i.e first task of A connector to W1 worker node
> and
> > > sec
> > > > > task
> > > > > > > of
> > > > > > > > A connector to W2 worker node, similarly for first task of B
> > > > > connector,
> > > > > > > > will go to W1 node and sec task of B connector go to W2 node.
> > > > > > > >
> > > > > > > > e.g
> > > > > > > > *#First Connector : *
> > > > > > > > {
> > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > >   "connector": {
> > > > > > > >     "state": "RUNNING",
> > > > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > > > >   },
> > > > > > > >   "tasks": [
> > > > > > > >     {
> > > > > > > >       "id": 0,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 1,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 2,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 3,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 4,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 5,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 6,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 7,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 8,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 9,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     }
> > > > > > > >   ],
> > > > > > > >   "type": "sink"
> > > > > > > > }
> > > > > > > >
> > > > > > > >
> > > > > > > > *#Sec connector*
> > > > > > > >
> > > > > > > > {
> > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > >   "connector": {
> > > > > > > >     "state": "RUNNING",
> > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > >   },
> > > > > > > >   "tasks": [
> > > > > > > >     {
> > > > > > > >       "id": 0,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 1,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 2,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 3,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 4,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 5,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 6,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 7,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 8,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 9,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     }
> > > > > > > >   ],
> > > > > > > >   "type": "sink"
> > > > > > > > }
> > > > > > > >
> > > > > > > > But I have seen a strange behavior, when I just shutdown W2
> > > worker
> > > > > node
> > > > > > > > and start it again task are divided but in diff way i.e all
> the
> > > > tasks
> > > > > > of
> > > > > > > A
> > > > > > > > connector will get into W1 node and tasks of B Connector into
> > W2
> > > > > node.
> > > > > > > >
> > > > > > > > Can you please have a look for this.
> > > > > > > >
> > > > > > > > *#First Connector*
> > > > > > > >
> > > > > > > > {
> > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > >   "connector": {
> > > > > > > >     "state": "RUNNING",
> > > > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > > > >   },
> > > > > > > >   "tasks": [
> > > > > > > >     {
> > > > > > > >       "id": 0,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 1,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 2,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 3,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 4,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 5,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 6,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 7,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 8,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 9,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > >     }
> > > > > > > >   ],
> > > > > > > >   "type": "sink"
> > > > > > > > }
> > > > > > > >
> > > > > > > > *#Second Connector *:
> > > > > > > >
> > > > > > > > {
> > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > >   "connector": {
> > > > > > > >     "state": "RUNNING",
> > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > >   },
> > > > > > > >   "tasks": [
> > > > > > > >     {
> > > > > > > >       "id": 0,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 1,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 2,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 3,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 4,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 5,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 6,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 7,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 8,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     },
> > > > > > > >     {
> > > > > > > >       "id": 9,
> > > > > > > >       "state": "RUNNING",
> > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > >     }
> > > > > > > >   ],
> > > > > > > >   "type": "sink"
> > > > > > > > }
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards and Thanks
> > > > > > > > Deepak Raghav
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Deepak Raghav <de...@gmail.com>.
Hi Robin

I had gone though the link you provided, It is not helpful in my case.
Apart from this, *I am not getting why the tasks are divided in *below
pattern* when they are *first time registered*, which is expected behavior.
I*s there any parameter which we can pass in worker property file which
handle the task assignment strategy like we have range assigner or round
robin in consumer-group ?

connector rest status api result after first registration :

{
  "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.0.5:*8080*"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.0.4:*8078*"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "10.0.0.5:*8080*"
    }
  ],
  "type": "sink"
}

and

{
  "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.0.4:*8078*"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.0.4:*8078*"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "10.0.0.5:*8080*"
    }
  ],
  "type": "sink"
}


But when I stop the second worker process and wait for
scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start the
process again. Result is different.

{
  "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.0.5:*8080*"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.0.5:*8080*"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "10.0.0.5:*8080*"
    }
  ],
  "type": "sink"
}

and

{
  "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.0.4:*8078*"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.0.4:*8078*"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "10.0.0.4:*8078*"
    }
  ],
  "type": "sink"
}

Regards and Thanks
Deepak Raghav



On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io> wrote:

> Thanks for the clarification. If this is an actual problem that you're
> encountering and need a solution to then since the task allocation is not
> deterministic it sounds like you need to deploy separate worker clusters
> based on the workload patterns that you are seeing and machine resources
> available.
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
>
>
> On Wed, 20 May 2020 at 16:39, Deepak Raghav <de...@gmail.com>
> wrote:
>
> > Hi Robin
> >
> > Replying to your query i.e
> >
> > One thing I'd ask at this point is though if it makes any difference
> where
> > the tasks execute?
> >
> > It actually makes difference to us, we have 16 connectors and as I stated
> > tasks division earlier, first 8 connector' task are assigned to first
> > worker process and another connector's task to another worker process and
> > just to mention that these 16 connectors are sink connectors. Each sink
> > connector consumes message from different topic.There may be a case when
> > messages are coming only for first 8 connector's topic and because all
> the
> > tasks of these connectors are assigned to First Worker, load would be
> high
> > on it and another set of connectors in another worker would be idle.
> >
> > Instead, if the task would have been divided evenly then this case would
> > have been avoided. Because tasks of each connector would be present in
> both
> > workers process like below :
> >
> > *W1*                       *W2*
> >  C1T1                    C1T2
> >  C2T2                    C2T2
> >
> > I hope, I gave your answer,
> >
> >
> > Regards and Thanks
> > Deepak Raghav
> >
> >
> >
> > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io>
> wrote:
> >
> > > OK, I understand better now.
> > >
> > > You can read more about the guts of the rebalancing protocol that Kafka
> > > Connect uses as of Apache Kafka 2.3 an onwards here:
> > >
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> > >
> > > One thing I'd ask at this point is though if it makes any difference
> > where
> > > the tasks execute? The point of a cluster is that Kafka Connect manages
> > the
> > > workload allocation. If you need workload separation and
> > > guaranteed execution locality I would suggest separate Kafka Connect
> > > distributed clusters.
> > >
> > >
> > > --
> > >
> > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> @rmoff
> > >
> > >
> > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <de...@gmail.com>
> > > wrote:
> > >
> > > > Hi Robin
> > > >
> > > > Thanks for your reply.
> > > >
> > > > We are having two worker on different IP. The example which I gave
> you
> > it
> > > > was just a example. We are using kafka version 2.3.1.
> > > >
> > > > Let me tell you again with a simple example.
> > > >
> > > > Suppose, we have two EC2 node, N1 and N2 having worker process W1 and
> > W2
> > > > running in distribute mode with groupId i.e in same cluster and two
> > > > connectors with having two task each i.e
> > > >
> > > > Node N1: W1 is running
> > > > Node N2 : W2 is running
> > > >
> > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id : C1T2
> > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with id :
> C2T2
> > > >
> > > > Now Suppose If both W1 and W2 worker process are running  and I
> > register
> > > > Connector C1 and C2 one after another i.e sequentially, on any of the
> > > > worker process, the tasks division between the worker
> > > > node are happening like below, which is expected.
> > > >
> > > > *W1*                       *W2*
> > > > C1T1                    C1T2
> > > > C2T2                    C2T2
> > > >
> > > > Now, suppose I stop one worker process e.g W2 and start after some
> > time,
> > > > the tasks division is changed like below i.e first connector's task
> > move
> > > to
> > > > W1 and second connector's task move to W2
> > > >
> > > > *W1*                       *W2*
> > > > C1T1                    C2T1
> > > > C1T2                    C2T2
> > > >
> > > >
> > > > Please let me know, If it is understandable or not.
> > > >
> > > > Note : Actually, In production, we are gonna have 16 connectors
> having
> > 10
> > > > task each and two worker node. With above scenario, first 8
> > connectors's
> > > > task move to W1 and next 8 connector' task move to W2, Which is not
> > > > expected.
> > > >
> > > >
> > > > Regards and Thanks
> > > > Deepak Raghav
> > > >
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <ro...@confluent.io>
> > > wrote:
> > > >
> > > > > So you're running two workers on the same machine (10.0.0.4), is
> > > > > that correct? Normally you'd run one worker per machine unless
> there
> > > was
> > > > a
> > > > > particular reason otherwise.
> > > > > What version of Apache Kafka are you using?
> > > > > I'm not clear from your question if the distribution of tasks is
> > > > > presenting a problem to you (if so please describe why), or if
> you're
> > > > just
> > > > > interested in the theory behind the rebalancing protocol?
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> > > @rmoff
> > > > >
> > > > >
> > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> > deepakraghav86@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi
> > > > > >
> > > > > > Please, can anybody help me with this?
> > > > > >
> > > > > > Regards and Thanks
> > > > > > Deepak Raghav
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > > deepakraghav86@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Team
> > > > > > >
> > > > > > > We have two worker node in a cluster and 2 connector with
> having
> > 10
> > > > > tasks
> > > > > > > each.
> > > > > > >
> > > > > > > Now, suppose if we have two kafka connect process W1(Port 8080)
> > and
> > > > > > > W2(Port 8078) started already in distribute mode and then
> > register
> > > > the
> > > > > > > connectors, task of one connector i.e 10 tasks are divided
> > equally
> > > > > > between
> > > > > > > two worker i.e first task of A connector to W1 worker node and
> > sec
> > > > task
> > > > > > of
> > > > > > > A connector to W2 worker node, similarly for first task of B
> > > > connector,
> > > > > > > will go to W1 node and sec task of B connector go to W2 node.
> > > > > > >
> > > > > > > e.g
> > > > > > > *#First Connector : *
> > > > > > > {
> > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > >   "connector": {
> > > > > > >     "state": "RUNNING",
> > > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > > >   },
> > > > > > >   "tasks": [
> > > > > > >     {
> > > > > > >       "id": 0,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 1,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 2,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 3,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 4,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 5,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 6,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 7,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 8,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 9,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     }
> > > > > > >   ],
> > > > > > >   "type": "sink"
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > *#Sec connector*
> > > > > > >
> > > > > > > {
> > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > >   "connector": {
> > > > > > >     "state": "RUNNING",
> > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > >   },
> > > > > > >   "tasks": [
> > > > > > >     {
> > > > > > >       "id": 0,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 1,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 2,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 3,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 4,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 5,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 6,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 7,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 8,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 9,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     }
> > > > > > >   ],
> > > > > > >   "type": "sink"
> > > > > > > }
> > > > > > >
> > > > > > > But I have seen a strange behavior, when I just shutdown W2
> > worker
> > > > node
> > > > > > > and start it again task are divided but in diff way i.e all the
> > > tasks
> > > > > of
> > > > > > A
> > > > > > > connector will get into W1 node and tasks of B Connector into
> W2
> > > > node.
> > > > > > >
> > > > > > > Can you please have a look for this.
> > > > > > >
> > > > > > > *#First Connector*
> > > > > > >
> > > > > > > {
> > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > >   "connector": {
> > > > > > >     "state": "RUNNING",
> > > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > > >   },
> > > > > > >   "tasks": [
> > > > > > >     {
> > > > > > >       "id": 0,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 1,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 2,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 3,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 4,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 5,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 6,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 7,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 8,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 9,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     }
> > > > > > >   ],
> > > > > > >   "type": "sink"
> > > > > > > }
> > > > > > >
> > > > > > > *#Second Connector *:
> > > > > > >
> > > > > > > {
> > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > >   "connector": {
> > > > > > >     "state": "RUNNING",
> > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > >   },
> > > > > > >   "tasks": [
> > > > > > >     {
> > > > > > >       "id": 0,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 1,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 2,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 3,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 4,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 5,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 6,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 7,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 8,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 9,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     }
> > > > > > >   ],
> > > > > > >   "type": "sink"
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > Regards and Thanks
> > > > > > > Deepak Raghav
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Robin Moffatt <ro...@confluent.io>.
Thanks for the clarification. If this is an actual problem that you're
encountering and need a solution to then since the task allocation is not
deterministic it sounds like you need to deploy separate worker clusters
based on the workload patterns that you are seeing and machine resources
available.


-- 

Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff


On Wed, 20 May 2020 at 16:39, Deepak Raghav <de...@gmail.com>
wrote:

> Hi Robin
>
> Replying to your query i.e
>
> One thing I'd ask at this point is though if it makes any difference where
> the tasks execute?
>
> It actually makes difference to us, we have 16 connectors and as I stated
> tasks division earlier, first 8 connector' task are assigned to first
> worker process and another connector's task to another worker process and
> just to mention that these 16 connectors are sink connectors. Each sink
> connector consumes message from different topic.There may be a case when
> messages are coming only for first 8 connector's topic and because all the
> tasks of these connectors are assigned to First Worker, load would be high
> on it and another set of connectors in another worker would be idle.
>
> Instead, if the task would have been divided evenly then this case would
> have been avoided. Because tasks of each connector would be present in both
> workers process like below :
>
> *W1*                       *W2*
>  C1T1                    C1T2
>  C2T2                    C2T2
>
> I hope, I gave your answer,
>
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io> wrote:
>
> > OK, I understand better now.
> >
> > You can read more about the guts of the rebalancing protocol that Kafka
> > Connect uses as of Apache Kafka 2.3 an onwards here:
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> >
> > One thing I'd ask at this point is though if it makes any difference
> where
> > the tasks execute? The point of a cluster is that Kafka Connect manages
> the
> > workload allocation. If you need workload separation and
> > guaranteed execution locality I would suggest separate Kafka Connect
> > distributed clusters.
> >
> >
> > --
> >
> > Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
> >
> >
> > On Wed, 20 May 2020 at 10:24, Deepak Raghav <de...@gmail.com>
> > wrote:
> >
> > > Hi Robin
> > >
> > > Thanks for your reply.
> > >
> > > We are having two worker on different IP. The example which I gave you
> it
> > > was just a example. We are using kafka version 2.3.1.
> > >
> > > Let me tell you again with a simple example.
> > >
> > > Suppose, we have two EC2 node, N1 and N2 having worker process W1 and
> W2
> > > running in distribute mode with groupId i.e in same cluster and two
> > > connectors with having two task each i.e
> > >
> > > Node N1: W1 is running
> > > Node N2 : W2 is running
> > >
> > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id : C1T2
> > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with id : C2T2
> > >
> > > Now Suppose If both W1 and W2 worker process are running  and I
> register
> > > Connector C1 and C2 one after another i.e sequentially, on any of the
> > > worker process, the tasks division between the worker
> > > node are happening like below, which is expected.
> > >
> > > *W1*                       *W2*
> > > C1T1                    C1T2
> > > C2T2                    C2T2
> > >
> > > Now, suppose I stop one worker process e.g W2 and start after some
> time,
> > > the tasks division is changed like below i.e first connector's task
> move
> > to
> > > W1 and second connector's task move to W2
> > >
> > > *W1*                       *W2*
> > > C1T1                    C2T1
> > > C1T2                    C2T2
> > >
> > >
> > > Please let me know, If it is understandable or not.
> > >
> > > Note : Actually, In production, we are gonna have 16 connectors having
> 10
> > > task each and two worker node. With above scenario, first 8
> connectors's
> > > task move to W1 and next 8 connector' task move to W2, Which is not
> > > expected.
> > >
> > >
> > > Regards and Thanks
> > > Deepak Raghav
> > >
> > >
> > >
> > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <ro...@confluent.io>
> > wrote:
> > >
> > > > So you're running two workers on the same machine (10.0.0.4), is
> > > > that correct? Normally you'd run one worker per machine unless there
> > was
> > > a
> > > > particular reason otherwise.
> > > > What version of Apache Kafka are you using?
> > > > I'm not clear from your question if the distribution of tasks is
> > > > presenting a problem to you (if so please describe why), or if you're
> > > just
> > > > interested in the theory behind the rebalancing protocol?
> > > >
> > > >
> > > > --
> > > >
> > > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> > @rmoff
> > > >
> > > >
> > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> deepakraghav86@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > Please, can anybody help me with this?
> > > > >
> > > > > Regards and Thanks
> > > > > Deepak Raghav
> > > > >
> > > > >
> > > > >
> > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > deepakraghav86@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Team
> > > > > >
> > > > > > We have two worker node in a cluster and 2 connector with having
> 10
> > > > tasks
> > > > > > each.
> > > > > >
> > > > > > Now, suppose if we have two kafka connect process W1(Port 8080)
> and
> > > > > > W2(Port 8078) started already in distribute mode and then
> register
> > > the
> > > > > > connectors, task of one connector i.e 10 tasks are divided
> equally
> > > > > between
> > > > > > two worker i.e first task of A connector to W1 worker node and
> sec
> > > task
> > > > > of
> > > > > > A connector to W2 worker node, similarly for first task of B
> > > connector,
> > > > > > will go to W1 node and sec task of B connector go to W2 node.
> > > > > >
> > > > > > e.g
> > > > > > *#First Connector : *
> > > > > > {
> > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > >   "connector": {
> > > > > >     "state": "RUNNING",
> > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > >   },
> > > > > >   "tasks": [
> > > > > >     {
> > > > > >       "id": 0,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 1,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 2,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 3,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 4,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 5,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 6,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 7,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 8,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 9,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     }
> > > > > >   ],
> > > > > >   "type": "sink"
> > > > > > }
> > > > > >
> > > > > >
> > > > > > *#Sec connector*
> > > > > >
> > > > > > {
> > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > >   "connector": {
> > > > > >     "state": "RUNNING",
> > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > >   },
> > > > > >   "tasks": [
> > > > > >     {
> > > > > >       "id": 0,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 1,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 2,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 3,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 4,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 5,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 6,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 7,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 8,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 9,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     }
> > > > > >   ],
> > > > > >   "type": "sink"
> > > > > > }
> > > > > >
> > > > > > But I have seen a strange behavior, when I just shutdown W2
> worker
> > > node
> > > > > > and start it again task are divided but in diff way i.e all the
> > tasks
> > > > of
> > > > > A
> > > > > > connector will get into W1 node and tasks of B Connector into W2
> > > node.
> > > > > >
> > > > > > Can you please have a look for this.
> > > > > >
> > > > > > *#First Connector*
> > > > > >
> > > > > > {
> > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > >   "connector": {
> > > > > >     "state": "RUNNING",
> > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > >   },
> > > > > >   "tasks": [
> > > > > >     {
> > > > > >       "id": 0,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 1,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 2,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 3,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 4,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 5,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 6,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 7,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 8,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 9,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > >     }
> > > > > >   ],
> > > > > >   "type": "sink"
> > > > > > }
> > > > > >
> > > > > > *#Second Connector *:
> > > > > >
> > > > > > {
> > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > >   "connector": {
> > > > > >     "state": "RUNNING",
> > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > >   },
> > > > > >   "tasks": [
> > > > > >     {
> > > > > >       "id": 0,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 1,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 2,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 3,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 4,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 5,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 6,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 7,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 8,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     },
> > > > > >     {
> > > > > >       "id": 9,
> > > > > >       "state": "RUNNING",
> > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > >     }
> > > > > >   ],
> > > > > >   "type": "sink"
> > > > > > }
> > > > > >
> > > > > >
> > > > > > Regards and Thanks
> > > > > > Deepak Raghav
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Deepak Raghav <de...@gmail.com>.
Hi Robin

Replying to your query i.e

One thing I'd ask at this point is though if it makes any difference where
the tasks execute?

It actually makes difference to us, we have 16 connectors and as I stated
tasks division earlier, first 8 connector' task are assigned to first
worker process and another connector's task to another worker process and
just to mention that these 16 connectors are sink connectors. Each sink
connector consumes message from different topic.There may be a case when
messages are coming only for first 8 connector's topic and because all the
tasks of these connectors are assigned to First Worker, load would be high
on it and another set of connectors in another worker would be idle.

Instead, if the task would have been divided evenly then this case would
have been avoided. Because tasks of each connector would be present in both
workers process like below :

*W1*                       *W2*
 C1T1                    C1T2
 C2T2                    C2T2

I hope, I gave your answer,


Regards and Thanks
Deepak Raghav



On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io> wrote:

> OK, I understand better now.
>
> You can read more about the guts of the rebalancing protocol that Kafka
> Connect uses as of Apache Kafka 2.3 an onwards here:
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
>
> One thing I'd ask at this point is though if it makes any difference where
> the tasks execute? The point of a cluster is that Kafka Connect manages the
> workload allocation. If you need workload separation and
> guaranteed execution locality I would suggest separate Kafka Connect
> distributed clusters.
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
>
>
> On Wed, 20 May 2020 at 10:24, Deepak Raghav <de...@gmail.com>
> wrote:
>
> > Hi Robin
> >
> > Thanks for your reply.
> >
> > We are having two worker on different IP. The example which I gave you it
> > was just a example. We are using kafka version 2.3.1.
> >
> > Let me tell you again with a simple example.
> >
> > Suppose, we have two EC2 node, N1 and N2 having worker process W1 and W2
> > running in distribute mode with groupId i.e in same cluster and two
> > connectors with having two task each i.e
> >
> > Node N1: W1 is running
> > Node N2 : W2 is running
> >
> > First Connector (C1) : Task1 with id : C1T1 and task 2 with id : C1T2
> > Second Connector (C2) : Task1 with id : C2T1 and task 2 with id : C2T2
> >
> > Now Suppose If both W1 and W2 worker process are running  and I register
> > Connector C1 and C2 one after another i.e sequentially, on any of the
> > worker process, the tasks division between the worker
> > node are happening like below, which is expected.
> >
> > *W1*                       *W2*
> > C1T1                    C1T2
> > C2T2                    C2T2
> >
> > Now, suppose I stop one worker process e.g W2 and start after some time,
> > the tasks division is changed like below i.e first connector's task move
> to
> > W1 and second connector's task move to W2
> >
> > *W1*                       *W2*
> > C1T1                    C2T1
> > C1T2                    C2T2
> >
> >
> > Please let me know, If it is understandable or not.
> >
> > Note : Actually, In production, we are gonna have 16 connectors having 10
> > task each and two worker node. With above scenario, first 8 connectors's
> > task move to W1 and next 8 connector' task move to W2, Which is not
> > expected.
> >
> >
> > Regards and Thanks
> > Deepak Raghav
> >
> >
> >
> > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <ro...@confluent.io>
> wrote:
> >
> > > So you're running two workers on the same machine (10.0.0.4), is
> > > that correct? Normally you'd run one worker per machine unless there
> was
> > a
> > > particular reason otherwise.
> > > What version of Apache Kafka are you using?
> > > I'm not clear from your question if the distribution of tasks is
> > > presenting a problem to you (if so please describe why), or if you're
> > just
> > > interested in the theory behind the rebalancing protocol?
> > >
> > >
> > > --
> > >
> > > Robin Moffatt | Senior Developer Advocate | robin@confluent.io |
> @rmoff
> > >
> > >
> > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <de...@gmail.com>
> > > wrote:
> > >
> > > > Hi
> > > >
> > > > Please, can anybody help me with this?
> > > >
> > > > Regards and Thanks
> > > > Deepak Raghav
> > > >
> > > >
> > > >
> > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > deepakraghav86@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Team
> > > > >
> > > > > We have two worker node in a cluster and 2 connector with having 10
> > > tasks
> > > > > each.
> > > > >
> > > > > Now, suppose if we have two kafka connect process W1(Port 8080) and
> > > > > W2(Port 8078) started already in distribute mode and then register
> > the
> > > > > connectors, task of one connector i.e 10 tasks are divided equally
> > > > between
> > > > > two worker i.e first task of A connector to W1 worker node and sec
> > task
> > > > of
> > > > > A connector to W2 worker node, similarly for first task of B
> > connector,
> > > > > will go to W1 node and sec task of B connector go to W2 node.
> > > > >
> > > > > e.g
> > > > > *#First Connector : *
> > > > > {
> > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > >   "connector": {
> > > > >     "state": "RUNNING",
> > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > >   },
> > > > >   "tasks": [
> > > > >     {
> > > > >       "id": 0,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > >     },
> > > > >     {
> > > > >       "id": 1,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 2,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 3,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 4,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 5,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 6,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 7,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 8,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 9,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     }
> > > > >   ],
> > > > >   "type": "sink"
> > > > > }
> > > > >
> > > > >
> > > > > *#Sec connector*
> > > > >
> > > > > {
> > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > >   "connector": {
> > > > >     "state": "RUNNING",
> > > > >     "worker_id": "10.0.0.4:8078"
> > > > >   },
> > > > >   "tasks": [
> > > > >     {
> > > > >       "id": 0,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 1,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 2,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 3,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 4,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 5,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 6,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 7,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 8,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 9,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     }
> > > > >   ],
> > > > >   "type": "sink"
> > > > > }
> > > > >
> > > > > But I have seen a strange behavior, when I just shutdown W2 worker
> > node
> > > > > and start it again task are divided but in diff way i.e all the
> tasks
> > > of
> > > > A
> > > > > connector will get into W1 node and tasks of B Connector into W2
> > node.
> > > > >
> > > > > Can you please have a look for this.
> > > > >
> > > > > *#First Connector*
> > > > >
> > > > > {
> > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > >   "connector": {
> > > > >     "state": "RUNNING",
> > > > >     "worker_id": "10.0.0.4:8080"
> > > > >   },
> > > > >   "tasks": [
> > > > >     {
> > > > >       "id": 0,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 1,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 2,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 3,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 4,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 5,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 6,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 7,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 8,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     },
> > > > >     {
> > > > >       "id": 9,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8080"
> > > > >     }
> > > > >   ],
> > > > >   "type": "sink"
> > > > > }
> > > > >
> > > > > *#Second Connector *:
> > > > >
> > > > > {
> > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > >   "connector": {
> > > > >     "state": "RUNNING",
> > > > >     "worker_id": "10.0.0.4:8078"
> > > > >   },
> > > > >   "tasks": [
> > > > >     {
> > > > >       "id": 0,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 1,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 2,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 3,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 4,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 5,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 6,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 7,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 8,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     },
> > > > >     {
> > > > >       "id": 9,
> > > > >       "state": "RUNNING",
> > > > >       "worker_id": "10.0.0.4:8078"
> > > > >     }
> > > > >   ],
> > > > >   "type": "sink"
> > > > > }
> > > > >
> > > > >
> > > > > Regards and Thanks
> > > > > Deepak Raghav
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Robin Moffatt <ro...@confluent.io>.
OK, I understand better now.

You can read more about the guts of the rebalancing protocol that Kafka
Connect uses as of Apache Kafka 2.3 an onwards here:
https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/

One thing I'd ask at this point is though if it makes any difference where
the tasks execute? The point of a cluster is that Kafka Connect manages the
workload allocation. If you need workload separation and
guaranteed execution locality I would suggest separate Kafka Connect
distributed clusters.


-- 

Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff


On Wed, 20 May 2020 at 10:24, Deepak Raghav <de...@gmail.com>
wrote:

> Hi Robin
>
> Thanks for your reply.
>
> We are having two worker on different IP. The example which I gave you it
> was just a example. We are using kafka version 2.3.1.
>
> Let me tell you again with a simple example.
>
> Suppose, we have two EC2 node, N1 and N2 having worker process W1 and W2
> running in distribute mode with groupId i.e in same cluster and two
> connectors with having two task each i.e
>
> Node N1: W1 is running
> Node N2 : W2 is running
>
> First Connector (C1) : Task1 with id : C1T1 and task 2 with id : C1T2
> Second Connector (C2) : Task1 with id : C2T1 and task 2 with id : C2T2
>
> Now Suppose If both W1 and W2 worker process are running  and I register
> Connector C1 and C2 one after another i.e sequentially, on any of the
> worker process, the tasks division between the worker
> node are happening like below, which is expected.
>
> *W1*                       *W2*
> C1T1                    C1T2
> C2T2                    C2T2
>
> Now, suppose I stop one worker process e.g W2 and start after some time,
> the tasks division is changed like below i.e first connector's task move to
> W1 and second connector's task move to W2
>
> *W1*                       *W2*
> C1T1                    C2T1
> C1T2                    C2T2
>
>
> Please let me know, If it is understandable or not.
>
> Note : Actually, In production, we are gonna have 16 connectors having 10
> task each and two worker node. With above scenario, first 8 connectors's
> task move to W1 and next 8 connector' task move to W2, Which is not
> expected.
>
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <ro...@confluent.io> wrote:
>
> > So you're running two workers on the same machine (10.0.0.4), is
> > that correct? Normally you'd run one worker per machine unless there was
> a
> > particular reason otherwise.
> > What version of Apache Kafka are you using?
> > I'm not clear from your question if the distribution of tasks is
> > presenting a problem to you (if so please describe why), or if you're
> just
> > interested in the theory behind the rebalancing protocol?
> >
> >
> > --
> >
> > Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
> >
> >
> > On Wed, 20 May 2020 at 06:46, Deepak Raghav <de...@gmail.com>
> > wrote:
> >
> > > Hi
> > >
> > > Please, can anybody help me with this?
> > >
> > > Regards and Thanks
> > > Deepak Raghav
> > >
> > >
> > >
> > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> deepakraghav86@gmail.com>
> > > wrote:
> > >
> > > > Hi Team
> > > >
> > > > We have two worker node in a cluster and 2 connector with having 10
> > tasks
> > > > each.
> > > >
> > > > Now, suppose if we have two kafka connect process W1(Port 8080) and
> > > > W2(Port 8078) started already in distribute mode and then register
> the
> > > > connectors, task of one connector i.e 10 tasks are divided equally
> > > between
> > > > two worker i.e first task of A connector to W1 worker node and sec
> task
> > > of
> > > > A connector to W2 worker node, similarly for first task of B
> connector,
> > > > will go to W1 node and sec task of B connector go to W2 node.
> > > >
> > > > e.g
> > > > *#First Connector : *
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.4:*8080*"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:*8078*"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 2,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 3,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 4,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 5,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 6,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 7,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 8,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 9,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > >
> > > > *#Sec connector*
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.4:8078"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 2,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 3,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 4,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 5,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 6,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 7,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 8,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 9,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > > But I have seen a strange behavior, when I just shutdown W2 worker
> node
> > > > and start it again task are divided but in diff way i.e all the tasks
> > of
> > > A
> > > > connector will get into W1 node and tasks of B Connector into W2
> node.
> > > >
> > > > Can you please have a look for this.
> > > >
> > > > *#First Connector*
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.4:8080"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 2,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 3,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 4,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 5,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 6,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 7,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 8,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     },
> > > >     {
> > > >       "id": 9,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8080"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > > *#Second Connector *:
> > > >
> > > > {
> > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > >   "connector": {
> > > >     "state": "RUNNING",
> > > >     "worker_id": "10.0.0.4:8078"
> > > >   },
> > > >   "tasks": [
> > > >     {
> > > >       "id": 0,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 1,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 2,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 3,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 4,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 5,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 6,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 7,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 8,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     },
> > > >     {
> > > >       "id": 9,
> > > >       "state": "RUNNING",
> > > >       "worker_id": "10.0.0.4:8078"
> > > >     }
> > > >   ],
> > > >   "type": "sink"
> > > > }
> > > >
> > > >
> > > > Regards and Thanks
> > > > Deepak Raghav
> > > >
> > > >
> > >
> >
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Deepak Raghav <de...@gmail.com>.
Hi Robin

Thanks for your reply.

We are having two worker on different IP. The example which I gave you it
was just a example. We are using kafka version 2.3.1.

Let me tell you again with a simple example.

Suppose, we have two EC2 node, N1 and N2 having worker process W1 and W2
running in distribute mode with groupId i.e in same cluster and two
connectors with having two task each i.e

Node N1: W1 is running
Node N2 : W2 is running

First Connector (C1) : Task1 with id : C1T1 and task 2 with id : C1T2
Second Connector (C2) : Task1 with id : C2T1 and task 2 with id : C2T2

Now Suppose If both W1 and W2 worker process are running  and I register
Connector C1 and C2 one after another i.e sequentially, on any of the
worker process, the tasks division between the worker
node are happening like below, which is expected.

*W1*                       *W2*
C1T1                    C1T2
C2T2                    C2T2

Now, suppose I stop one worker process e.g W2 and start after some time,
the tasks division is changed like below i.e first connector's task move to
W1 and second connector's task move to W2

*W1*                       *W2*
C1T1                    C2T1
C1T2                    C2T2


Please let me know, If it is understandable or not.

Note : Actually, In production, we are gonna have 16 connectors having 10
task each and two worker node. With above scenario, first 8 connectors's
task move to W1 and next 8 connector' task move to W2, Which is not
expected.


Regards and Thanks
Deepak Raghav



On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <ro...@confluent.io> wrote:

> So you're running two workers on the same machine (10.0.0.4), is
> that correct? Normally you'd run one worker per machine unless there was a
> particular reason otherwise.
> What version of Apache Kafka are you using?
> I'm not clear from your question if the distribution of tasks is
> presenting a problem to you (if so please describe why), or if you're just
> interested in the theory behind the rebalancing protocol?
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
>
>
> On Wed, 20 May 2020 at 06:46, Deepak Raghav <de...@gmail.com>
> wrote:
>
> > Hi
> >
> > Please, can anybody help me with this?
> >
> > Regards and Thanks
> > Deepak Raghav
> >
> >
> >
> > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <de...@gmail.com>
> > wrote:
> >
> > > Hi Team
> > >
> > > We have two worker node in a cluster and 2 connector with having 10
> tasks
> > > each.
> > >
> > > Now, suppose if we have two kafka connect process W1(Port 8080) and
> > > W2(Port 8078) started already in distribute mode and then register the
> > > connectors, task of one connector i.e 10 tasks are divided equally
> > between
> > > two worker i.e first task of A connector to W1 worker node and sec task
> > of
> > > A connector to W2 worker node, similarly for first task of B connector,
> > > will go to W1 node and sec task of B connector go to W2 node.
> > >
> > > e.g
> > > *#First Connector : *
> > > {
> > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > >   "connector": {
> > >     "state": "RUNNING",
> > >     "worker_id": "10.0.0.4:*8080*"
> > >   },
> > >   "tasks": [
> > >     {
> > >       "id": 0,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:*8078*"
> > >     },
> > >     {
> > >       "id": 1,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 2,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 3,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 4,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 5,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 6,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 7,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 8,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 9,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     }
> > >   ],
> > >   "type": "sink"
> > > }
> > >
> > >
> > > *#Sec connector*
> > >
> > > {
> > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > >   "connector": {
> > >     "state": "RUNNING",
> > >     "worker_id": "10.0.0.4:8078"
> > >   },
> > >   "tasks": [
> > >     {
> > >       "id": 0,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 1,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 2,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 3,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 4,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 5,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 6,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 7,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 8,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 9,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     }
> > >   ],
> > >   "type": "sink"
> > > }
> > >
> > > But I have seen a strange behavior, when I just shutdown W2 worker node
> > > and start it again task are divided but in diff way i.e all the tasks
> of
> > A
> > > connector will get into W1 node and tasks of B Connector into W2 node.
> > >
> > > Can you please have a look for this.
> > >
> > > *#First Connector*
> > >
> > > {
> > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > >   "connector": {
> > >     "state": "RUNNING",
> > >     "worker_id": "10.0.0.4:8080"
> > >   },
> > >   "tasks": [
> > >     {
> > >       "id": 0,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 1,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 2,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 3,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 4,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 5,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 6,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 7,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 8,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     },
> > >     {
> > >       "id": 9,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8080"
> > >     }
> > >   ],
> > >   "type": "sink"
> > > }
> > >
> > > *#Second Connector *:
> > >
> > > {
> > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > >   "connector": {
> > >     "state": "RUNNING",
> > >     "worker_id": "10.0.0.4:8078"
> > >   },
> > >   "tasks": [
> > >     {
> > >       "id": 0,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 1,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 2,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 3,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 4,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 5,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 6,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 7,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 8,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     },
> > >     {
> > >       "id": 9,
> > >       "state": "RUNNING",
> > >       "worker_id": "10.0.0.4:8078"
> > >     }
> > >   ],
> > >   "type": "sink"
> > > }
> > >
> > >
> > > Regards and Thanks
> > > Deepak Raghav
> > >
> > >
> >
>

Re: Kafka Connect Connector Tasks Uneven Division

Posted by Robin Moffatt <ro...@confluent.io>.
So you're running two workers on the same machine (10.0.0.4), is
that correct? Normally you'd run one worker per machine unless there was a
particular reason otherwise.
What version of Apache Kafka are you using?
I'm not clear from your question if the distribution of tasks is
presenting a problem to you (if so please describe why), or if you're just
interested in the theory behind the rebalancing protocol?


-- 

Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff


On Wed, 20 May 2020 at 06:46, Deepak Raghav <de...@gmail.com>
wrote:

> Hi
>
> Please, can anybody help me with this?
>
> Regards and Thanks
> Deepak Raghav
>
>
>
> On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <de...@gmail.com>
> wrote:
>
> > Hi Team
> >
> > We have two worker node in a cluster and 2 connector with having 10 tasks
> > each.
> >
> > Now, suppose if we have two kafka connect process W1(Port 8080) and
> > W2(Port 8078) started already in distribute mode and then register the
> > connectors, task of one connector i.e 10 tasks are divided equally
> between
> > two worker i.e first task of A connector to W1 worker node and sec task
> of
> > A connector to W2 worker node, similarly for first task of B connector,
> > will go to W1 node and sec task of B connector go to W2 node.
> >
> > e.g
> > *#First Connector : *
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.4:*8080*"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:*8078*"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 2,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 3,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 4,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 5,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 6,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 7,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 8,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 9,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> >
> > *#Sec connector*
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.4:8078"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 2,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 3,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 4,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 5,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 6,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 7,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 8,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 9,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> > But I have seen a strange behavior, when I just shutdown W2 worker node
> > and start it again task are divided but in diff way i.e all the tasks of
> A
> > connector will get into W1 node and tasks of B Connector into W2 node.
> >
> > Can you please have a look for this.
> >
> > *#First Connector*
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.4:8080"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 2,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 3,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 4,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 5,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 6,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 7,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 8,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     },
> >     {
> >       "id": 9,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8080"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> > *#Second Connector *:
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.4:8078"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 2,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 3,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 4,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 5,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 6,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 7,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 8,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     },
> >     {
> >       "id": 9,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:8078"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> >
> > Regards and Thanks
> > Deepak Raghav
> >
> >
>