You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Hengtai Nie (Jira)" <ji...@apache.org> on 2022/03/15 06:18:00 UTC

[jira] [Updated] (FLINK-26643) Exception occurs when use TypeInformationKeyValueSerializationSchema to deserialize data from kafka

     [ https://issues.apache.org/jira/browse/FLINK-26643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hengtai Nie updated FLINK-26643:
--------------------------------
    Summary: Exception occurs when use TypeInformationKeyValueSerializationSchema to deserialize data from kafka  (was: Exception occues when use TypeInformationKeyValueSerializationSchema to deserialize data from kafka)

> Exception occurs when use TypeInformationKeyValueSerializationSchema to deserialize data from kafka
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26643
>                 URL: https://issues.apache.org/jira/browse/FLINK-26643
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.13.2, 1.14.3
>            Reporter: Hengtai Nie
>            Priority: Major
>
> Recently , I use TypeInformationKeyValueSerializationSchema  to deserialize data from kafka, it throws java.io.EOFException .
>  
> Useage in code: 
> setDeserializer(KafkaRecordDeserializationSchema.of(new TypeInformationKeyValueSerializationSchema (String.class, String.class, new ExecutionConfig())));
>  
> Data in Kafka is like:
> key(string):2 22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50
> value(json string):{
> "timestamp":"2022-3-11 17:9",
> "deviceNumber":"22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50 ",
> "eventId":"830500"
> }
>  
> After  trouble shooting, I think follwing API is the root cause:
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte()
> {code:java}
> public int readUnsignedByte () throws IOException {
>     if (this.position < this.end) {
>         return (this.buffer[this.position++] & 0xff);
>     } else {
>         throw new EOFException();
>     }
> }{code}
>  
> Obviously, it is wrong to throw an exception when (this.positon == this.end). 
> It just means finishing to read unsigned byte when  (this.positon == this.end), nothing need to do.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)