You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Stephen Durfey <sj...@gmail.com> on 2017/07/20 15:01:26 UTC

Kafka Connect distributed mode rebalance

I'm seeing some behavior with the DistributedHerder that I am trying to
understand. I'm working on setting up a cluster of kafka connect nodes and
have a relatively large number of connectors to submit to it (392
connectors right now that will soon become over 1100). As for the
deployment of it I am using chef, and having that PUT connector configs at
deployment time so I can create/update any connectors.

Everytime I PUT a new connector config to the worker it appears to be
initiating an assignment rebalance. I believe this is only happening when
submitting a new connector. This is causing all existing and running
connectors to stop and restart. My logs end up being flooded with
exceptions from the source jdbc task with sql connections being closed and
wakeup exceptions in my sink tasks when committing offsets. This causes
issues beyond having to wait for a rebalance as restarting the jdbc
connectors causes them to re-pull all data, since they are using bulk mode.
Everything eventually settles down and all the connectors finish
successfully, but each PUT takes progressively longer waiting for a
rebalance to finish.

If I simply restart the worker nodes and let them only instantiate
connectors that have already been successfully submitted everything starts
up fine. So, this is only an issue when submitting new connectors over the
REST endpoint.

So, I'm trying to understand why submitting a new connector causes the
rebalancing, but also if there is a better way to deploy the connector
configs in distributed mode?

Thanks,

Stephen

Re: Kafka Connect distributed mode rebalance

Posted by Stephen Durfey <sj...@gmail.com>.
Ewen, thanks for the reply. I will admit that in the current state my
connector use is not optimal. Today it is a combination of JDBC source
connectors and a custom sink connector.

The original rollout was before single message transforms (SMT) weren't
available (2.0.1 was our first deployment, and we couldn't roll out 3.x
yet). The data being pulled from sql server needed to be written to a set
of topics in a specific format for our downstream processing to pick it up
and process it normally and deliver to our downstream customers. Due to
this a custom sink connector was needed to pick up the data from the topic
written to by the worker, transform it, and then write it back out to kafka
to a different set of topics. So, for each new datasource we loaded, we
needed 2 connectors instead of just 1. Writing an SMT will solve this issue
and is something being worked on.

The other reason, and I believe you correctly pointed out, is potential
issues with the jdbc connector. So, today there is one connector per table
per source (sad face). There are two reasons for this: 1) some tables
needed custom select statements due to only needing a few columns from a
much larger table and 2) I believe there were data type translation issues
between sql server types and what the jdbc connector was translating them
to. This was originally 8 or so months ago, so my memory is a bit hazy on
the exact issue with types. I can do some testing and try to identify the
issues and work on submitting a pull request to address the translation
issue.

As far as the work to change the behavior of task rebalancing I'd be
interested in hearing your thoughts on the subject of how to address it.
It's a feature change I would be interested in working on as I think in the
future as I start migrating to a Mesos/Docker based deployment I'll
want/need everything to be in distributed mode instead of standalone.

- Stephen


On Wed, Jul 26, 2017 at 2:34 AM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Btw, if you can share, I would be curious what connectors you're using and
> why you need so many. I'd be interested if a modification to the connector
> could also simplify things for you.
>
> -Ewen
>
> On Wed, Jul 26, 2017 at 12:33 AM, Ewen Cheslack-Postava <ewen@confluent.io
> >
> wrote:
>
> > Stephen,
> >
> > Cool, that is a *lot* of connectors!
> >
> > Regarding rebalances, the reason this happens is that Kafka Connect is
> > trying to keep the total work of the cluster balanced across the workers.
> > If you add/remove connectors or the # of workers change, then we need to
> go
> > through another round deciding where that work is done. The way this is
> > accomplished is by having the workers coordinate through Kafka's group
> > coordination protocol by performing a rebalance. This is very similar to
> > how consumer rebalances work -- the members all "rejoin" the group, one
> > figures out how to assign work, and then everyone gets their assignments
> > and restarts work.
> >
> > The way this works today is global -- everyone has to stop work, commit
> > offsets, then start the process where work is assigned, and finally
> restart
> > work. That's why you're seeing everything stop, then restart.
> >
> > We know this will eventually become a scalability limit. We've talked
> > about other approaches that avoid requiring stopping everything. There's
> > not currently a JIRA with more details & ideas, but
> > https://issues.apache.org/jira/browse/KAFKA-5505 is filed for the
> general
> > issue. We haven't committed to any specific approach, but I've thought
> > through this a bit and have some ideas around how we could make the
> process
> > more incremental such that we don't have to stop *everything* during a
> > single rebalance process, instead accepting the cost of some subsequent
> > rebalances in order to make each iteration faster/cheaper.
> >
> > I'm not sure when we'll get these updates in yet. One other thing to
> > consider is if it is possible to use fewer connectors at a time. One of
> our
> > goals was to encourage broad copying by default; fewer connectors/tasks
> > doesn't necessarily solve your problem, but depending on the connectors
> > you're using it is possible it would reduce the time spent
> > stopping/starting tasks during the rebalance and alleviate your problem.
> >
> > -Ewen
> >
> > On Thu, Jul 20, 2017 at 8:01 AM, Stephen Durfey <sj...@gmail.com>
> > wrote:
> >
> >> I'm seeing some behavior with the DistributedHerder that I am trying to
> >> understand. I'm working on setting up a cluster of kafka connect nodes
> and
> >> have a relatively large number of connectors to submit to it (392
> >> connectors right now that will soon become over 1100). As for the
> >> deployment of it I am using chef, and having that PUT connector configs
> at
> >> deployment time so I can create/update any connectors.
> >>
> >> Everytime I PUT a new connector config to the worker it appears to be
> >> initiating an assignment rebalance. I believe this is only happening
> when
> >> submitting a new connector. This is causing all existing and running
> >> connectors to stop and restart. My logs end up being flooded with
> >> exceptions from the source jdbc task with sql connections being closed
> and
> >> wakeup exceptions in my sink tasks when committing offsets. This causes
> >> issues beyond having to wait for a rebalance as restarting the jdbc
> >> connectors causes them to re-pull all data, since they are using bulk
> >> mode.
> >> Everything eventually settles down and all the connectors finish
> >> successfully, but each PUT takes progressively longer waiting for a
> >> rebalance to finish.
> >>
> >> If I simply restart the worker nodes and let them only instantiate
> >> connectors that have already been successfully submitted everything
> starts
> >> up fine. So, this is only an issue when submitting new connectors over
> the
> >> REST endpoint.
> >>
> >> So, I'm trying to understand why submitting a new connector causes the
> >> rebalancing, but also if there is a better way to deploy the connector
> >> configs in distributed mode?
> >>
> >> Thanks,
> >>
> >> Stephen
> >>
> >
> >
>

Re: Kafka Connect distributed mode rebalance

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Btw, if you can share, I would be curious what connectors you're using and
why you need so many. I'd be interested if a modification to the connector
could also simplify things for you.

-Ewen

On Wed, Jul 26, 2017 at 12:33 AM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Stephen,
>
> Cool, that is a *lot* of connectors!
>
> Regarding rebalances, the reason this happens is that Kafka Connect is
> trying to keep the total work of the cluster balanced across the workers.
> If you add/remove connectors or the # of workers change, then we need to go
> through another round deciding where that work is done. The way this is
> accomplished is by having the workers coordinate through Kafka's group
> coordination protocol by performing a rebalance. This is very similar to
> how consumer rebalances work -- the members all "rejoin" the group, one
> figures out how to assign work, and then everyone gets their assignments
> and restarts work.
>
> The way this works today is global -- everyone has to stop work, commit
> offsets, then start the process where work is assigned, and finally restart
> work. That's why you're seeing everything stop, then restart.
>
> We know this will eventually become a scalability limit. We've talked
> about other approaches that avoid requiring stopping everything. There's
> not currently a JIRA with more details & ideas, but
> https://issues.apache.org/jira/browse/KAFKA-5505 is filed for the general
> issue. We haven't committed to any specific approach, but I've thought
> through this a bit and have some ideas around how we could make the process
> more incremental such that we don't have to stop *everything* during a
> single rebalance process, instead accepting the cost of some subsequent
> rebalances in order to make each iteration faster/cheaper.
>
> I'm not sure when we'll get these updates in yet. One other thing to
> consider is if it is possible to use fewer connectors at a time. One of our
> goals was to encourage broad copying by default; fewer connectors/tasks
> doesn't necessarily solve your problem, but depending on the connectors
> you're using it is possible it would reduce the time spent
> stopping/starting tasks during the rebalance and alleviate your problem.
>
> -Ewen
>
> On Thu, Jul 20, 2017 at 8:01 AM, Stephen Durfey <sj...@gmail.com>
> wrote:
>
>> I'm seeing some behavior with the DistributedHerder that I am trying to
>> understand. I'm working on setting up a cluster of kafka connect nodes and
>> have a relatively large number of connectors to submit to it (392
>> connectors right now that will soon become over 1100). As for the
>> deployment of it I am using chef, and having that PUT connector configs at
>> deployment time so I can create/update any connectors.
>>
>> Everytime I PUT a new connector config to the worker it appears to be
>> initiating an assignment rebalance. I believe this is only happening when
>> submitting a new connector. This is causing all existing and running
>> connectors to stop and restart. My logs end up being flooded with
>> exceptions from the source jdbc task with sql connections being closed and
>> wakeup exceptions in my sink tasks when committing offsets. This causes
>> issues beyond having to wait for a rebalance as restarting the jdbc
>> connectors causes them to re-pull all data, since they are using bulk
>> mode.
>> Everything eventually settles down and all the connectors finish
>> successfully, but each PUT takes progressively longer waiting for a
>> rebalance to finish.
>>
>> If I simply restart the worker nodes and let them only instantiate
>> connectors that have already been successfully submitted everything starts
>> up fine. So, this is only an issue when submitting new connectors over the
>> REST endpoint.
>>
>> So, I'm trying to understand why submitting a new connector causes the
>> rebalancing, but also if there is a better way to deploy the connector
>> configs in distributed mode?
>>
>> Thanks,
>>
>> Stephen
>>
>
>

Re: Kafka Connect distributed mode rebalance

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Stephen,

Cool, that is a *lot* of connectors!

Regarding rebalances, the reason this happens is that Kafka Connect is
trying to keep the total work of the cluster balanced across the workers.
If you add/remove connectors or the # of workers change, then we need to go
through another round deciding where that work is done. The way this is
accomplished is by having the workers coordinate through Kafka's group
coordination protocol by performing a rebalance. This is very similar to
how consumer rebalances work -- the members all "rejoin" the group, one
figures out how to assign work, and then everyone gets their assignments
and restarts work.

The way this works today is global -- everyone has to stop work, commit
offsets, then start the process where work is assigned, and finally restart
work. That's why you're seeing everything stop, then restart.

We know this will eventually become a scalability limit. We've talked about
other approaches that avoid requiring stopping everything. There's not
currently a JIRA with more details & ideas, but
https://issues.apache.org/jira/browse/KAFKA-5505 is filed for the general
issue. We haven't committed to any specific approach, but I've thought
through this a bit and have some ideas around how we could make the process
more incremental such that we don't have to stop *everything* during a
single rebalance process, instead accepting the cost of some subsequent
rebalances in order to make each iteration faster/cheaper.

I'm not sure when we'll get these updates in yet. One other thing to
consider is if it is possible to use fewer connectors at a time. One of our
goals was to encourage broad copying by default; fewer connectors/tasks
doesn't necessarily solve your problem, but depending on the connectors
you're using it is possible it would reduce the time spent
stopping/starting tasks during the rebalance and alleviate your problem.

-Ewen

On Thu, Jul 20, 2017 at 8:01 AM, Stephen Durfey <sj...@gmail.com> wrote:

> I'm seeing some behavior with the DistributedHerder that I am trying to
> understand. I'm working on setting up a cluster of kafka connect nodes and
> have a relatively large number of connectors to submit to it (392
> connectors right now that will soon become over 1100). As for the
> deployment of it I am using chef, and having that PUT connector configs at
> deployment time so I can create/update any connectors.
>
> Everytime I PUT a new connector config to the worker it appears to be
> initiating an assignment rebalance. I believe this is only happening when
> submitting a new connector. This is causing all existing and running
> connectors to stop and restart. My logs end up being flooded with
> exceptions from the source jdbc task with sql connections being closed and
> wakeup exceptions in my sink tasks when committing offsets. This causes
> issues beyond having to wait for a rebalance as restarting the jdbc
> connectors causes them to re-pull all data, since they are using bulk mode.
> Everything eventually settles down and all the connectors finish
> successfully, but each PUT takes progressively longer waiting for a
> rebalance to finish.
>
> If I simply restart the worker nodes and let them only instantiate
> connectors that have already been successfully submitted everything starts
> up fine. So, this is only an issue when submitting new connectors over the
> REST endpoint.
>
> So, I'm trying to understand why submitting a new connector causes the
> rebalancing, but also if there is a better way to deploy the connector
> configs in distributed mode?
>
> Thanks,
>
> Stephen
>