You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2021/12/06 18:11:00 UTC
[jira] [Commented] (KAFKA-13476) Streams crashes when non Base64 Offset Metadata is found
[ https://issues.apache.org/jira/browse/KAFKA-13476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454154#comment-17454154 ]
Matthias J. Sax commented on KAFKA-13476:
-----------------------------------------
[~RBosch81] Thanks for reporting this issue. What I am wondering: where could the corrupted metadata come from? Only Kafka Streams should commit offsets for the use group (ie, `application.id`)?
> Streams crashes when non Base64 Offset Metadata is found
> --------------------------------------------------------
>
> Key: KAFKA-13476
> URL: https://issues.apache.org/jira/browse/KAFKA-13476
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.4.0
> Reporter: Richard Bosch
> Priority: Minor
> Original Estimate: 0.5h
> Remaining Estimate: 0.5h
>
> Kafka Streams applications use the metadata stored with the committed offsets from previous running instances to extract timestamps.
> But when the metadata field contains other data the Base64 decoder will throw an exception causing the Streams application to fail.
> A new Offset commit is then required to stop this failure.
> I've included the part of the log when we started a Kafka Streams app after setting the offsets using a third party tool. This tool adds some tracing metadata so developers and operators could debug who performed this custom offset commit.
>
> {noformat}
> 2021-11-16 12:56:36.020 INFO 25 --- [-StreamThread-2] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=example-app-3, groupId=axual-demo-example-example-app] Unsubscribed all topics or patterns and assigned partitions
> at java.base/java.util.Base64$Decoder.decode(Unknown Source) ~[na:na]
> at org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:1039) ~[kafka-streams-2.7.0.jar:na]
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) ~[kafka-streams-2.7.0.jar:na]
> at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:837) ~[kafka-streams-2.7.0.jar:na]
> java.lang.IllegalArgumentException: Illegal base64 character 7b
> at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728) ~[kafka-streams-2.7.0.jar:na]
> at org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:818) ~[kafka-streams-2.7.0.jar:na]
> 2021-11-16 12:56:36.127 ERROR 25 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [streams-example-app-1] All stream threads have died. The instance will be in error state and should be closed.
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) ~[kafka-streams-2.7.0.jar:na]
> java.lang.IllegalArgumentException: Illegal base64 character 7b
> {noformat}
> I recommend adding a Try Catch block around the Base64 decode in the StreamTask.decodeTimestamp method and return the Unknown value when this occurs.
> This is pure for resilience when bad data is encountered.
> After the Streams application performs a new offset commit the error should not occur again, limiting the change of frequently occurring warnings in the logs
> I've already made the changes and added a test for this issue, as I would like to contribute to Kafka.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)