You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2021/12/06 18:11:00 UTC

[jira] [Commented] (KAFKA-13476) Streams crashes when non Base64 Offset Metadata is found

    [ https://issues.apache.org/jira/browse/KAFKA-13476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454154#comment-17454154 ] 

Matthias J. Sax commented on KAFKA-13476:
-----------------------------------------

[~RBosch81] Thanks for reporting this issue. What I am wondering: where could the corrupted metadata come from? Only Kafka Streams should commit offsets for the use group (ie, `application.id`)?

> Streams crashes when non Base64 Offset Metadata is found
> --------------------------------------------------------
>
>                 Key: KAFKA-13476
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13476
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: Richard Bosch
>            Priority: Minor
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> Kafka Streams applications use the metadata stored with the committed offsets from previous running instances to extract timestamps.
> But when the metadata field contains other data the Base64 decoder will throw an exception causing the Streams application to fail.
> A new Offset commit is then required to stop this failure.
> I've included the part of the log when we started a Kafka Streams app after setting the offsets using a third party tool. This tool adds some tracing metadata so developers and operators could debug who performed this custom offset commit.
>  
> {noformat}
> 2021-11-16 12:56:36.020  INFO 25 --- [-StreamThread-2] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=example-app-3, groupId=axual-demo-example-example-app] Unsubscribed all topics or patterns and assigned partitions
> 	at java.base/java.util.Base64$Decoder.decode(Unknown Source) ~[na:na]
> 	at org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:1039) ~[kafka-streams-2.7.0.jar:na]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) ~[kafka-streams-2.7.0.jar:na]
> 	at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:837) ~[kafka-streams-2.7.0.jar:na]
> java.lang.IllegalArgumentException: Illegal base64 character 7b
> 	at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728) ~[kafka-streams-2.7.0.jar:na]
> 	at org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:818) ~[kafka-streams-2.7.0.jar:na]
> 2021-11-16 12:56:36.127 ERROR 25 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [streams-example-app-1] All stream threads have died. The instance will be in error state and should be closed.
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) ~[kafka-streams-2.7.0.jar:na]
> java.lang.IllegalArgumentException: Illegal base64 character 7b
> {noformat}
> I recommend adding a Try Catch block around the Base64 decode in the StreamTask.decodeTimestamp method and return the Unknown value when this occurs.
> This is pure for resilience when bad data is encountered.
> After the Streams application performs a new offset commit the error should not occur again, limiting the change of frequently occurring warnings in the logs
> I've already made the changes and added a test for this issue, as I would like to contribute to Kafka.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)