You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Hironori Ogibayashi <og...@gmail.com> on 2016/07/12 02:43:18 UTC

Re: Blocked in KafkaConsumer.commitOffsets

Hi

After modification, my program run for 3 days without problem. Maximum
checkpoint
duration was 6 seconds. (Before modification, it took several minutes)
I think the issue was fixed with it.

Regards,
Hionori


2016-06-15 22:51 GMT+09:00 Robert Metzger <rm...@apache.org>:
> Hi,
>
> I've looked at this issue already at the Flink list and recommended Hironori
> to post here. It seems that the consumer is not returning from the poll()
> call, that's why the commitOffsets() method can not enter the synchronized
> block.
> The KafkaConsumer is logging the following statements:
>
> 2016-06-10 20:29:53,677 INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator 2147482645 dead.
> 2016-06-10 20:29:53,678 INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator 2147482645 dead.
> 2016-06-10 20:29:53,679 INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator 2147482645 dead.
> ....
> 2016-06-10 20:56:53,982 INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator 2147482645 dead.
>
>
> I guess that the poll() call is not returning within the given timeout
> while trying to reconnect to the brokers?
>
>
> On Wed, Jun 15, 2016 at 2:41 PM, Hironori Ogibayashi <og...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I am running stream processing job with Kafka and Flink.
>> Flink reads records from Kafka.
>>
>> My software versions are:
>> - Kafka broker: 0.9.0.2.4 (HDP 2.4.0.0 version)
>> - Kafka client library: 0.9.0.1
>> - Flink: 1.0.3
>>
>> Now I have problem that Flink job is sometimes blocked and consumer lag
>> is increasing.
>> I got thread dump during the situation.
>>
>> This is the blocked thread. Looks like blocked in
>> KafkaConsumer.commitOffsets.
>>
>> ----
>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>> [0x00007f2b3ddfc000]
>>    java.lang.Thread.State: BLOCKED (on object monitor)
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>         - waiting to lock <0x0000000659111b58> (a
>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>         - locked <0x0000000659111cc8> (a java.lang.Object)
>>         at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> ---
>>
>> And lock 0x0000000659111b58 is held by the following thread.
>>
>> ---
>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>> [0x00007f2b3dbfa000]
>>    java.lang.Thread.State: RUNNABLE
>>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>         - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>         - locked <0x0000000659457db8> (a
>> java.util.Collections$UnmodifiableSet)
>>         - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>         at
>> org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>         at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>         at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>         at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>         at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>         at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>         - locked <0x0000000659111b58> (a
>> org.apache.kafka.clients.consumer.KafkaConsumer)
>> ---
>>
>> I am wondering why Flink's kafka consumer is blocked and any advice
>> would be appreciated.
>>
>> Thanks,
>> Hironori Ogibayashi
>>