You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Mohan Parthasarathy (JIRA)" <ji...@apache.org> on 2019/04/11 18:01:00 UTC

[jira] [Resolved] (KAFKA-8214) Handling RecordTooLargeException in the main thread

     [ https://issues.apache.org/jira/browse/KAFKA-8214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mohan Parthasarathy resolved KAFKA-8214.
----------------------------------------
    Resolution: Duplicate

> Handling RecordTooLargeException in the main thread
> ---------------------------------------------------
>
>                 Key: KAFKA-8214
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8214
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.2
>            Reporter: Mohan Parthasarathy
>            Priority: Major
>
> How can we handle this exception in the main application ? If this task incurs this exception, then it does not commit the offset and hence it goes in a loop after that. This happens during aggregation process. We already have a limit on the message size of the topic which is 15 MB.
> org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_6, processor=KSTREAM-SOURCE-0000000016, topic=r-detection-KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition, partition=6, offset=2049
>         at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)                                                                                                   
>         at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)                                                                               
>         at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)                                                                                                 
>         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)                                                                                               
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)                                                                                               
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
>  
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_6] Abort sending since an error caught with a previous record (key fe80::a112:a206:bc15:8e86&fe80::743c:160:c0be:9e66&0 value [B@20dced9e timestamp 1554238297629) to topic xxxx-detection-KSTREAM-AGGREGATE-STATE-STORE-0000000012-changelog due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 15728866 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.                                                          
>         at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)                                                                         
>         at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)                                                                               
>         at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:192)                                                                          
>         at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)                                                                                                         
>         at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)                                                                                                           
>         at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)                                                                                    
>         at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)                                                                                        
>         at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)                                                                  
>         at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)                                                                  
>         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:100)                                                                        
>         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)                                                                                 
>         at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)                                                                                    
>         at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)                                                                                                         
>         at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:234)                                                                                                         
>         at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)                                                                                                  
>         at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)                                                                                                         
>         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:233)                                                                               
>         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:226)                                                                                       
>         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)                                                                                        
>         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:155)                                                                                       
>         at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:94)                                                                
>         at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)                                                                                             
>         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)                                                                               
>         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
>         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
>         at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>         at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
>         ... 5 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)