You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Hengtai Nie (Jira)" <ji...@apache.org> on 2022/03/15 06:14:00 UTC

[jira] [Created] (FLINK-26643) Exception occues when use TypeInformationKeyValueSerializationSchema to deserialize data from kafka

Hengtai Nie created FLINK-26643:
-----------------------------------

             Summary: Exception occues when use TypeInformationKeyValueSerializationSchema to deserialize data from kafka
                 Key: FLINK-26643
                 URL: https://issues.apache.org/jira/browse/FLINK-26643
             Project: Flink
          Issue Type: Bug
          Components: API / Core
    Affects Versions: 1.14.3, 1.13.2
            Reporter: Hengtai Nie


Recently , I use TypeInformationKeyValueSerializationSchema  to deserialize data from kafka, it throws java.io.EOFException .

 

Useage in code: 

setDeserializer(KafkaRecordDeserializationSchema.of(new TypeInformationKeyValueSerializationSchema (String.class, String.class, new ExecutionConfig())));

 

Data in Kafka is like:

key(string):2 22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50

value(json string):{

"timestamp":"2022-3-11 17:9",

"deviceNumber":"22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50 ",

"eventId":"830500"

}

 

After  trouble shooting, I think follwing API is the root cause:

org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte()
{code:java}
public int readUnsignedByte () throws IOException {
    if (this.position < this.end) {
        return (this.buffer[this.position++] & 0xff);
    } else {
        throw new EOFException();
    }
}{code}
 

Obviously, it is wrong to throw an exception when (this.positon == this.end). 

It just means finishing to read unsigned byte when  (this.positon == this.end), nothing need to do.

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)