You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2022/04/08 22:39:00 UTC

[jira] [Updated] (FLINK-18049) The Flink kafka consumer job will be interrupted if the upstream kafka producer change the AVRO schema

     [ https://issues.apache.org/jira/browse/FLINK-18049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Flink Jira Bot updated FLINK-18049:
-----------------------------------
    Labels: auto-deprioritized-critical auto-deprioritized-major pull-request-available stale-minor  (was: auto-deprioritized-critical auto-deprioritized-major pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized.


> The Flink kafka consumer job will be interrupted if the upstream kafka producer change the AVRO schema
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18049
>                 URL: https://issues.apache.org/jira/browse/FLINK-18049
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Zheng Hu
>            Priority: Minor
>              Labels: auto-deprioritized-critical, auto-deprioritized-major, pull-request-available, stale-minor
>
> We have encountered a critical case from online services.  we have the data pipeline:  (producer) -> (kafka) -> (flink consumer job), and all those records are encoded in AVRO format.  Once the producer changed the AVRO schema , says adding an extra column to the existing schema and writing few data into the Kafka. 
> Then the downstream flink job crashed with the following stacktrace: 
> {code}
> ==WARNING==  allocating large array--thread_id[0x00007fccd9c16800]--thread_name[Source: Custom Source (1/1)]--array_size[1590681120 bytes]--array_length[1590681103 elememts]
> os_prio=0 tid=0x00007fccd9c16800 nid=0x226c0 runnable 
>   at org.shaded.apache.avro.util.Utf8.setByteLength(Utf8.java:78)
>   at org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:261)
>   at org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
>   at org.shaded.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
>   at org.shaded.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412)
>   at org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>   at org.shaded.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>   at org.shaded.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>   at org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>   at org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>   at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
>   at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
>   at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
>   at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:192)
>   at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:771)
>   at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:120)
>   at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74)
>   at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:129)
>   at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:398)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:736)
>   at java.lang.Thread.run(Thread.java:834)
> {code} 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)