You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Scott Reynolds <sr...@twilio.com> on 2016/04/15 18:56:58 UTC

Kafka Connect misconfiguration. Need some help

List,

We are struggling with Kafka Connect settings. The process start up and
handle a bunch of messages and flush. Then slowly the Group coordinator
removes them.

This is has to be a interplay between Connect's flush interval and the call
to poll for each of these tasks. Here is my current settings that I think
are relevant.

Any insights someone could share with us ?

# on shutdown wait this long for the tasks to finish their flush.
task.shutdown.graceful.timeout.ms=600000

# Flush records to s3 every 1/2 hour
offset.flush.interval.ms=1800000

# Max time to wait for flushing to finish. Wait at *most* this long every
offset.flush.interval.ms.
offset.flush.timeout.ms=600000

# Take your time on session timeouts. We do a lot of work. These control
the length of time a lock on a TopicPartition can be held
# by the coordinator broker.
session.timeout.ms=180000
request.timeout.ms=190000
consumer.session.timeout.ms=180000
consumer.request.timeout.ms=190000

Re: Kafka Connect misconfiguration. Need some help

Posted by Scott Reynolds <sr...@twilio.com>.
Ok still really struggling with this. We have sped up the flush time quite
a bit but still failing. We are seeing all three group members are assigned
partitions and are assigned tasks. Then tasks start dropping off.


 The log line I think that indicates what is wrong is this:

>[Thread-9] INFO Marking the coordinator 1932295911 dead.
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

I strongly believe this is not the SinkTask consumer, the SinkTask sets the
thread name to the task name, but the WorkGroupCoordinator. Can someone
help me understand how the WorkGroupCoordinator's work ? Seems there is
some election that happens and one of the connector hosts is chosen as the
leader. I believe this log messages indicates that the elected Group
Coordinator did not respond to the heartbeat in time.

Searching through the source code I cannot figure out how the election
happens nor where the heartbeat response is generated. Anyone have any
guidance on where to look or how to debug ? Grasping at straws at this
moment.

On Fri, Apr 15, 2016 at 10:36 AM Scott Reynolds <sr...@twilio.com>
wrote:

> Awesome that is what I thought. Answer seems simple, speed up flush :-D,
> which we should be able to do.
>
> On Fri, Apr 15, 2016 at 10:15 AM Liquan Pei <li...@gmail.com> wrote:
>
>> Hi Scott,
>>
>> It seems that your flush takes longer time than
>> consumer.session.timeout.ms.
>> The consumers used in SinkTasks for a SinkConnector are in the same
>> consumer group. In case that your flush method takes longer than the
>> consumer.session.timeout.ms, the consumer for a SinkTask may be kicked
>> out
>> by the coordinator.
>>
>> In this case, you may want to increase the consumer.session.timeout.ms or
>> have some timeout mechanism in the implementation of the flush method to
>> return the control back to the framework so that it can send heartbeat to
>> the coordinator.
>>
>> Thanks,
>> Liquan
>>
>> On Fri, Apr 15, 2016 at 9:56 AM, Scott Reynolds <sr...@twilio.com>
>> wrote:
>>
>> > List,
>> >
>> > We are struggling with Kafka Connect settings. The process start up and
>> > handle a bunch of messages and flush. Then slowly the Group coordinator
>> > removes them.
>> >
>> > This is has to be a interplay between Connect's flush interval and the
>> call
>> > to poll for each of these tasks. Here is my current settings that I
>> think
>> > are relevant.
>> >
>> > Any insights someone could share with us ?
>> >
>> > # on shutdown wait this long for the tasks to finish their flush.
>> > task.shutdown.graceful.timeout.ms=600000
>> >
>> > # Flush records to s3 every 1/2 hour
>> > offset.flush.interval.ms=1800000
>> >
>> > # Max time to wait for flushing to finish. Wait at *most* this long
>> every
>> > offset.flush.interval.ms.
>> > offset.flush.timeout.ms=600000
>> >
>> > # Take your time on session timeouts. We do a lot of work. These control
>> > the length of time a lock on a TopicPartition can be held
>> > # by the coordinator broker.
>> > session.timeout.ms=180000
>> > request.timeout.ms=190000
>> > consumer.session.timeout.ms=180000
>> > consumer.request.timeout.ms=190000
>> >
>>
>>
>>
>> --
>> Liquan Pei
>> Software Engineer, Confluent Inc
>>
>

Re: Kafka Connect misconfiguration. Need some help

Posted by Scott Reynolds <sr...@twilio.com>.
Awesome that is what I thought. Answer seems simple, speed up flush :-D,
which we should be able to do.

On Fri, Apr 15, 2016 at 10:15 AM Liquan Pei <li...@gmail.com> wrote:

> Hi Scott,
>
> It seems that your flush takes longer time than
> consumer.session.timeout.ms.
> The consumers used in SinkTasks for a SinkConnector are in the same
> consumer group. In case that your flush method takes longer than the
> consumer.session.timeout.ms, the consumer for a SinkTask may be kicked out
> by the coordinator.
>
> In this case, you may want to increase the consumer.session.timeout.ms or
> have some timeout mechanism in the implementation of the flush method to
> return the control back to the framework so that it can send heartbeat to
> the coordinator.
>
> Thanks,
> Liquan
>
> On Fri, Apr 15, 2016 at 9:56 AM, Scott Reynolds <sr...@twilio.com>
> wrote:
>
> > List,
> >
> > We are struggling with Kafka Connect settings. The process start up and
> > handle a bunch of messages and flush. Then slowly the Group coordinator
> > removes them.
> >
> > This is has to be a interplay between Connect's flush interval and the
> call
> > to poll for each of these tasks. Here is my current settings that I think
> > are relevant.
> >
> > Any insights someone could share with us ?
> >
> > # on shutdown wait this long for the tasks to finish their flush.
> > task.shutdown.graceful.timeout.ms=600000
> >
> > # Flush records to s3 every 1/2 hour
> > offset.flush.interval.ms=1800000
> >
> > # Max time to wait for flushing to finish. Wait at *most* this long every
> > offset.flush.interval.ms.
> > offset.flush.timeout.ms=600000
> >
> > # Take your time on session timeouts. We do a lot of work. These control
> > the length of time a lock on a TopicPartition can be held
> > # by the coordinator broker.
> > session.timeout.ms=180000
> > request.timeout.ms=190000
> > consumer.session.timeout.ms=180000
> > consumer.request.timeout.ms=190000
> >
>
>
>
> --
> Liquan Pei
> Software Engineer, Confluent Inc
>

Re: Kafka Connect misconfiguration. Need some help

Posted by Liquan Pei <li...@gmail.com>.
Hi Scott,

It seems that your flush takes longer time than consumer.session.timeout.ms.
The consumers used in SinkTasks for a SinkConnector are in the same
consumer group. In case that your flush method takes longer than the
consumer.session.timeout.ms, the consumer for a SinkTask may be kicked out
by the coordinator.

In this case, you may want to increase the consumer.session.timeout.ms or
have some timeout mechanism in the implementation of the flush method to
return the control back to the framework so that it can send heartbeat to
the coordinator.

Thanks,
Liquan

On Fri, Apr 15, 2016 at 9:56 AM, Scott Reynolds <sr...@twilio.com>
wrote:

> List,
>
> We are struggling with Kafka Connect settings. The process start up and
> handle a bunch of messages and flush. Then slowly the Group coordinator
> removes them.
>
> This is has to be a interplay between Connect's flush interval and the call
> to poll for each of these tasks. Here is my current settings that I think
> are relevant.
>
> Any insights someone could share with us ?
>
> # on shutdown wait this long for the tasks to finish their flush.
> task.shutdown.graceful.timeout.ms=600000
>
> # Flush records to s3 every 1/2 hour
> offset.flush.interval.ms=1800000
>
> # Max time to wait for flushing to finish. Wait at *most* this long every
> offset.flush.interval.ms.
> offset.flush.timeout.ms=600000
>
> # Take your time on session timeouts. We do a lot of work. These control
> the length of time a lock on a TopicPartition can be held
> # by the coordinator broker.
> session.timeout.ms=180000
> request.timeout.ms=190000
> consumer.session.timeout.ms=180000
> consumer.request.timeout.ms=190000
>



-- 
Liquan Pei
Software Engineer, Confluent Inc