You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Prashant Waykar (Jira)" <ji...@apache.org> on 2020/05/06 06:13:00 UTC

[jira] [Comment Edited] (KAFKA-8612) Broker removes consumers from CG, Streams app gets stuck

    [ https://issues.apache.org/jira/browse/KAFKA-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100479#comment-17100479 ] 

Prashant Waykar edited comment on KAFKA-8612 at 5/6/20, 6:12 AM:
-----------------------------------------------------------------

I am seeing the same exception
{noformat}
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}}
{noformat}
I have a single partition and a single consumer consuming from this topic. In the server logs I see
{code:java}
[2020-05-02 19:06:24,560] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-05-02 19:16:24,560] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-05-02 19:27:24,221] WARN Attempting to send response via channel for which there is no open connection, connection id 127.0.0.1:9092-127.0.0.1:57496-467 (kafka.network.Processor)
[2020-05-02 19:27:24,221] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 59662 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-05-02 19:36:24,560] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-05-02 19:46:24,560] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-05-02 19:56:24,560] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
{code}
In the client logs, I see 
{code:java}
2020-05-02 19:27:24.124 UTC [Service_Listener, , , TxId: ] ERROR c.v.v.h.m.a.Listener- Error Retrieving event:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {ServiceJ-0=OffsetAndMetadata{offset=103917, leaderEpoch=null, metadata=''}}
{code}
[~guozhang] Can you please take a look at this issue.

 


was (Author: waykarp):
I am seeing the same exception
{noformat}
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}}
{noformat}

> Broker removes consumers from CG, Streams app gets stuck
> --------------------------------------------------------
>
>                 Key: KAFKA-8612
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8612
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, streams
>    Affects Versions: 2.1.1
>            Reporter: Di Campo
>            Priority: Major
>              Labels: broker, streams, timeout
>         Attachments: full-thread-dump-kafka-streams-stuck.log
>
>
> Cluster of 5 brokers, `Kafka 2.1.1`. m5.large (2 CPU, 8GB RAM) instances. 
> Kafka Streams application (`stream-processor`) cluster of 3 instances, 2 threads each. `2.1.0` 
> Consumer Store consumer group (ClickHouse Kafka Engine from `ClickHouse 19.5.3.8`), with several tables consuming from a different topic each.
> The `stream-processor` is running consuming from a source topic and running a topology of 26 topics (64 partitions each) with 5 state stores, 1 of them sessioned, 4 key-value.
> Infra running on docker on AWS ECS. 
> Consuming at a rate of 300-1000 events per second. Each event generates an avg of ~20 extra messages.
> Application has uncaughtExceptionHandler set.
> Timestamps are kept for better analysis.
> `stream-processor` tasks at some point fail to produce to any partition due to timeouts:
>     
> {noformat}
> [2019-06-28 10:04:21,113] ERROR task [1_48] Error sending record (...) to topic (...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 record(s) for (...)-48:120002 ms has passed since batch creation; No more records will be sent and no more offsets will be recorded for this task.
> {noformat}
> and "Offset commit failed" errors, in all partitions:
> {noformat}
>     [2019-06-28 10:04:27,705] ERROR [Consumer clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-1-consumer, groupId=stream-processor-0.0.1] Offset commit failed on partition events-raw-63 at offset 4858803: The request timed out. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> {noformat}
> _At this point we begin seeing error messages in one of the brokers (see below, Broker logs section)._
> More error messages are shown on the `stream-processor`: 
> {noformat}
>     org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}}
> {noformat}
> then hundreds of messages of the following types (one per topic-partitio) intertwinned: 
> {noformat}
>     [2019-06-28 10:05:23,608] WARN [Producer clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer] Got error produce response with correlation id 39946 on topic-partition (topic)-63, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
> {noformat}
> {noformat}
>     [2019-06-28 10:05:23,609] WARN [Producer clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer] Received invalid metadata error in produce request on partition (topic)1-59 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
> {noformat}
> And then: 
> {noformat}
>     [2019-06-28 10:05:47,986] ERROR stream-thread [stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4] Failed to commit stream task 1_57 due to the following error: (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
>     2019-06-28 10:05:47org.apache.kafka.streams.errors.StreamsException: task [1_57] Abort sending since an error caught with a previous record (...) to topic (...) due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
>     2019-06-28 10:05:47You can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.
>     2019-06-28 10:05:47 at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
> {noformat}
> ...and eventually we get to the error messages: 
> {noformat}
>     [2019-06-28 10:05:51,198] ERROR [Producer clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer] Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender)
>     2019-06-28 10:05:51java.util.ConcurrentModificationException
>     2019-06-28 10:05:51 at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> {noformat}
> {noformat}
>     [2019-06-28 10:07:18,735] ERROR task [1_63] Failed to flush state store orderStore: (org.apache.kafka.streams.processor.internals.ProcessorStateManager) org.apache.kafka.streams.errors.StreamsException: task [1_63] Abort sending since an error caught with a previous record (...) timestamp 1561664080389) to topic (...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44 record(s) for pageview-sessions-0.0.1-63:120007 ms has passed since batch creation
> {noformat}
> ...and eventually after seeing many messages like the above, the KafkaStreams is closed and the task dies, you can see when it finally dies in our piece of logging: 
> {noformat}
>     [2019-06-28 10:08:23,334] ERROR Streams sent to close. 
> {noformat}
> ----------------
> *One* (not all) of the *brokers* show several messages like this:
>     
> {code:java}
> [2019-06-28 10:04:42,192] WARN Attempting to send response via channel for which there is no open connection, connection id 172.17.0.3:9092-(client-IP):47760-24314 (kafka.network.Processor)
>    ...
>     [2019-06-28 10:07:38,128] WARN Attempting to send response via channel for which there is no open connection, connection id 172.17.0.3:9092-(client-IP):49038-24810 (kafka.network.Processor)
> {code}
> and several messages like this, also from the same broker: 
> {noformat}
> 2019-06-28 10:06:51,235] INFO [GroupCoordinator 3]: Member stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4-consumer-f0c7d7b0-7f3b-465b-bf68-e55df2d783ed in group stream-processor-0.0.1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
> {noformat}
> In other points in time, there are also Membership errors found for the ClickHouse consumer group, same task:
> {noformat}
> 9-06-28 10:10:31,243] INFO [GroupCoordinator 3]: Member ClickHouse 19.5.3.8-c095f8ec-efc8-4b3a-93c5-6cd2aa9ee0ef in group (chgroup2) has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
> 2019-06-28 12:10:30[2019-06-28 10:10:30,752] INFO [GroupCoordinator 3]: Member stream-processor-0.0.1-af495b81-7572-4dc0-a5c2-952916d8e41d-StreamThread-2-consumer-f6166b97-ef38-4d21-9f39-c47bf13d794b in group stream-processor-0.0.1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)c688458e-35c9-41dc-9d85-5a559cf886fe
> [2019-06-28 10:10:30,752] INFO [GroupCoordinator 3]: Member stream-processor-0.0.1-af495b81-7572-4dc0-a5c2-952916d8e41d-StreamThread-2-consumer-f6166b97-ef38-4d21-9f39-c47bf13d794b in group stream-processor-0.0.1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
> [2019-06-28 10:10:29,495] INFO [GroupCoordinator 3]: Member ClickHouse 19.5.3.8-127af48f-b50c-4af1-a3a3-4ebd0ebeeeab in group (chgroup1) has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
> [2019-06-28 10:10:29,495] INFO [GroupCoordinator 3]: Member ClickHouse 19.5.3.8-8f98f7b7-eb41-41c0-a37a-f42e54218a47 in group (chgroup1) has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
> {noformat}
> ----------------
> After this, stream-processor tasks are restarted as they die. 
> And then, they start and remain idle and do nothing. They are stuck, but don't show anything in the logs. 
> Consumer groups (via kafka-consumer-groups tool) showed lag, but tasks are not consuming, but not dying either - they remain idle indefinitely.
> After a full restart of the whole service (not rolling: 3 instances, to 0, then 3 again) tasks can join the consumer groups and then start processing as normal.
> And some time later, the whole case begins again and it got finally stuck.
> Sometimes, as tasks are restarted automatically one by one by ECS, they process a few records and then a lot of messages like this appear, and they die a few minutes later. 
> {noformat}
> [2019-06-28 15:42:02,830] WARN [Producer clientId=stream-processor-0.0.1-74ae7baf-ba97-4472-b8b8-63264331a7e7-StreamThread-4-producer] Received invalid metadata error in produce request on partition (topic)-41 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
> {noformat}
> while seeing in the brokers messages of the kind: 
> {noformat}
> [2019-06-28 14:47:06,003] INFO [GroupCoordinator 3]: Member stream-processor-0.0.1-8b44587c-ae29-498a-839e-40d3e8e25064-StreamThread-2-consumer-0d5b9463-93c1-4b13-b121-21600fd2bf28 in group stream-processor-0.0.1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
> {noformat}
> The stack trace via kill -QUIT when the instances are stuck, shows threads like this: 
> {noformat}
> "stream-processor-0.0.1-2b95a3b1-fd4d-4ef5-ad31-8914145e7b7f-StreamThread-4" #33 prio=5 os_prio=0 tid=0x00007fa761a25000 nid=0x2b runnable [0x00007fa707a5e000]
> java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000006f1113120> (a sun.nio.ch.Util$3)
> - locked <0x00000006f1113110> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000006f1112f68> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.java:752)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:451)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
> {noformat}
> (see rest of trace attached). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)