You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Marcos Juarez <mj...@gmail.com> on 2016/06/14 20:40:30 UTC

Re: Kafka Connect tasks consumers issue

Liquan,

We're constantly hitting this problem in our prod cluster.  Do you have a
JIRA issue that relates to this, and when will this bugfix be backported to
the 0.9.x branch?  We're not planning on upgrading to 0.10 for a while,
since the assumption was that the 0.9.x line would be more stable.

Thanks,

Marcos Juarez

On Mon, May 16, 2016 at 12:28 PM, Liquan Pei <li...@gmail.com> wrote:

> Hi Matteo,
>
> There was a bug in the 0.9.1 such that task.close() can be invoked both in
> the Worker thread and Herder thread. There can be a race condition that
> consumer.close() is invoked in multiple threads at the same time. As the
> consumer is designed to be used in single thread, thus the concurrent
> modification exception is thrown if consumer.close() method is invoked by
> multiple threads.
>
> This bug is fixed in the coming 0.10.0 release and consumer.close() is only
> invoked in the SinkTaskThread.
>
> Thanks,
> Liquan
>
> On Sun, May 15, 2016 at 11:57 PM, Matteo Luzzi <ma...@gmail.com>
> wrote:
>
> > Any other thoughts on this?
> > Thanks,
> > Matteo
> >
> > 2016-05-12 13:09 GMT+02:00 Matteo Luzzi <ma...@gmail.com>:
> >
> > > I found also this suspicious log snippet that might be revelant. The
> task
> > > executed by thread 134 is the one that won't receive messages
> > >
> > > INFO Attempt to heart beat failed since the group is rebalancing, try
> to
> > > re-join group.
> > > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:633)
> > > [2016-05-12 10:27:09,623] INFO
> > > [kafka-connect-topic-0, kafka-connect-topic-1, kafka-connect-topic-2,
> > kafka-connect-topic-3, kafka-connect-topic-4, kafka-connect-topic-5]
> > > topic-partitions are revoked
> > > from this thread task 134
> > (com.connect.elasticsearch_kafka_connector.task.ElasticsearchTask:99)
> > > [2016-05-12 10:27:09,623] INFO
> > > org.apache.kafka.connect.runtime.WorkerSinkTask@5c29784c Committing
> > > offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
> > > [2016-05-12 10:27:09,634] INFO [kafka-connect-topic-0] topic-partitions
> > > are assigned from this thread task 134
> > > (com.connect.elasticsearch_kafka_connector.task.ElasticsearchTask:92)
> > >
> > > 2016-05-12 8:58 GMT+02:00 Matteo Luzzi <ma...@gmail.com>:
> > >
> > >> Hi, Liquan
> > >>
> > >> I run the two workers inside docker containers and a connector having
> 6
> > >> tasks. They read from a topic having 6 partitions Then I kill one of
> the
> > >> two containers using docker kill or docker restart command. When the
> > >> container is up again a rebalance happens and sometimes few tasks
> don't
> > >> consume messages anymore even thought the onPartitionAssigned
> functions
> > >> says that they are handling a partition of the topic. Let me know if
> you
> > >> need other information
> > >> I use Kafka 0.9.0.
> > >>
> > >> Thanks for the help,
> > >> Matteo
> > >>
> > >> 2016-05-11 22:57 GMT+02:00 Liquan Pei <li...@gmail.com>:
> > >>
> > >>> Hi Matteo,
> > >>>
> > >>> I am not completely follow the steps.  Can you share the exact
> command
> > to
> > >>> reproduce the issue? What kind of commands did you use to restart the
> > >>> connector? Which version of Kafka are you using?
> > >>>
> > >>> Thanks,
> > >>> Liquan
> > >>>
> > >>> On Wed, May 11, 2016 at 4:40 AM, Matteo Luzzi <
> matteo.luzzi@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Hi again, I was able to reproduce the bug in the same scenario (two
> > >>> workers
> > >>> > on separate machines) just by deleting the connector from the Rest
> > API
> > >>> and
> > >>> > then restarting it again.
> > >>> > I also got this error on one of the workers :
> > >>> > [2016-05-11 11:29:47,034] INFO 172.17.42.1 - -
> [11/May/2016:11:29:45
> > >>> +0000]
> > >>> > "DELETE /connectors/kafka-sink-connector HTTP/1.1" 204 -  1171
> > >>> > (org.apache.kafka.connect.runtime.rest.RestServer:60)
> > >>> > [2016-05-11 11:29:52,034] INFO Forcing shutdown of thread
> > >>> > WorkerSinkTask-kafka-sink-connector-1
> > >>> > (org.apache.kafka.connect.util.ShutdownableThread:141)
> > >>> > [2016-05-11 11:29:52,050] ERROR Graceful stop of task
> > >>> > org.apache.kafka.connect.runtime.WorkerSinkTask@7761a73c failed.
> > >>> > (org.apache.kafka.connect.runtime.Worker:312)
> > >>> > [2016-05-11 11:29:52,051] ERROR Uncaught exception in herder work
> > >>> thread,
> > >>> > exiting:
> > >>> >
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
> > >>> > java.util.ConcurrentModificationException: KafkaConsumer is not
> safe
> > >>> for
> > >>> > multi-threaded access
> > >>> > at
> > >>> >
> > >>> >
> > >>>
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)
> > >>> > at
> > >>> >
> > >>> >
> > >>>
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)
> > >>> > at
> > >>> >
> > >>> >
> > >>>
> >
> org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)
> > >>> > at
> org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)
> > >>> > at
> > >>> >
> > >>> >
> > >>>
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
> > >>> > at
> > >>> >
> > >>> >
> > >>>
> >
> org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)
> > >>> > at
> > >>> >
> > >>> >
> > >>>
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
> > >>> > at
> > >>> >
> > >>> >
> > >>>
> >
> org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:132)
> > >>> > at
> > >>> >
> > >>> >
> > >>>
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182)
> > >>> > at
> > >>> >
> > >>> >
> > >>>
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
> > >>> > at java.lang.Thread.run(Thread.java:745)
> > >>> >
> > >>> > On the successive restart, 2 out of 6 tasks were not receiving
> > messages
> > >>> > anymore
> > >>> >
> > >>> > 2016-05-11 11:41 GMT+02:00 Matteo Luzzi <ma...@gmail.com>:
> > >>> >
> > >>> > > Hi Liquan,
> > >>> > > thanks for the fast response.
> > >>> > > I'm able to reproduce the error by having two workers running on
> > two
> > >>> > > different machines. If I restart one of the two worker, the
> > failover
> > >>> > logic
> > >>> > > correctly detects the failure and shut down the tasks on the
> > healthy
> > >>> > worker
> > >>> > > for rebalancing. When the failed worker is up again the tasks are
> > >>> > > distributed correctly among the two workers but some tasks don't
> > get
> > >>> new
> > >>> > > messages anymore. How can I check that actually all the input
> topic
> > >>> > > partitions are correctly reassigned?
> > >>> > >
> > >>> > > Matteo
> > >>> > >
> > >>> > > 2016-05-11 10:44 GMT+02:00 Liquan Pei <li...@gmail.com>:
> > >>> > >
> > >>> > >> Hi Matteo,
> > >>> > >>
> > >>> > >> Glad to hear that you are building a connector. To better
> > >>> understand the
> > >>> > >> issue, can you provide the exact steps to re-produce the issue?
> > One
> > >>> > thing
> > >>> > >> I
> > >>> > >> am confused is that when one worker is shutdown, you don't need
> to
> > >>> > restart
> > >>> > >> the connector through the rest API, the failover logic should
> > >>> handle the
> > >>> > >> connector and tasks shutdown and start up.
> > >>> > >>
> > >>> > >> The offset storage topic is used for storing offset for source
> > >>> > connectors.
> > >>> > >> For sink connector, the offset is simply Kafka offset and will
> be
> > >>> stored
> > >>> > >> in
> > >>> > >> the __consumer_offset topic.
> > >>> > >>
> > >>> > >> Thanks,
> > >>> > >> Liquan
> > >>> > >>
> > >>> > >> On Wed, May 11, 2016 at 1:31 AM, Matteo Luzzi <
> > >>> matteo.luzzi@gmail.com>
> > >>> > >> wrote:
> > >>> > >>
> > >>> > >> > Hi,
> > >>> > >> > I'm working on a custom implementation of a sink connector for
> > >>> Kafka
> > >>> > >> > Connect framework. I'm testing the connector for fault
> tolerance
> > >>> by
> > >>> > >> killing
> > >>> > >> > the worker process  and restarting the connector through the
> > Rest
> > >>> API
> > >>> > >> and
> > >>> > >> > occasionally I notice that some tasks don't receive anymore
> > >>> messages
> > >>> > >> from
> > >>> > >> > the internal consumers. I don't get any errors from the log
> and
> > >>> the
> > >>> > >> tasks
> > >>> > >> > seem to be initialised correctly but some of them just don't
> > >>> process
> > >>> > >> > messages anymore. Normally when I restart again the connector,
> > the
> > >>> > tasks
> > >>> > >> > read all the messages skipped before. I'm executing Kafka
> > Connect
> > >>> in
> > >>> > >> > distributed mode.
> > >>> > >> >
> > >>> > >> > Could it be a problem of the cleanup function invoked when
> > >>> closing the
> > >>> > >> > connector causing a leak in consumer connections with the
> > broker?
> > >>> Any
> > >>> > >> > ideas?
> > >>> > >> >
> > >>> > >> > And also, from the documentation I read that the connector
> save
> > >>> the
> > >>> > >> offset
> > >>> > >> > of the tasks in a special topic in Kafka (the one specified
> via
> > >>> > >> > offset.storage.topic) but it is empty even though the
> connector
> > >>> > process
> > >>> > >> > messages. Is it normal?
> > >>> > >> >
> > >>> > >> > Thanks,
> > >>> > >> > Matteo
> > >>> > >> >
> > >>> > >>
> > >>> > >>
> > >>> > >>
> > >>> > >> --
> > >>> > >> Liquan Pei
> > >>> > >> Software Engineer, Confluent Inc
> > >>> > >>
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > --
> > >>> > > Matteo Remo Luzzi
> > >>> > >
> > >>> >
> > >>> >
> > >>> >
> > >>> > --
> > >>> > Matteo Remo Luzzi
> > >>> >
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> Liquan Pei
> > >>> Software Engineer, Confluent Inc
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> Matteo Remo Luzzi
> > >>
> > >
> > >
> > >
> > > --
> > > Matteo Remo Luzzi
> > >
> >
> >
> >
> > --
> > Matteo Remo Luzzi
> >
>
>
>
> --
> Liquan Pei
> Software Engineer, Confluent Inc
>