You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dong Lin (Jira)" <ji...@apache.org> on 2022/11/24 15:16:00 UTC

[jira] [Assigned] (FLINK-30168) PyFlink Deserialization Error with Object Array

     [ https://issues.apache.org/jira/browse/FLINK-30168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dong Lin reassigned FLINK-30168:
--------------------------------

    Assignee: Xingbo Huang

> PyFlink Deserialization Error with Object Array
> -----------------------------------------------
>
>                 Key: FLINK-30168
>                 URL: https://issues.apache.org/jira/browse/FLINK-30168
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.16.0, 1.15.2
>            Reporter: Yunfeng Zhou
>            Assignee: Xingbo Huang
>            Priority: Major
>
> When it is attempted to collect object array records from a DataStream in PyFlink, an exception like follows would be thrown
> {code:java}
> data = 0, field_type = DenseVectorTypeInfo
> def pickled_bytes_to_python_converter(data, field_type):if isinstance(field_type, RowTypeInfo):
> row_kind = RowKind(int.from_bytes(data[0], 'little'))
> data = zip(list(data[1:]), field_type.get_field_types())
> fields = []for d, d_type in data:
> fields.append(pickled_bytes_to_python_converter(d, d_type))
> row = Row.of_kind(row_kind, *fields)return rowelse:
> > data = pickle.loads(data)
> E TypeError: a bytes-like object is required, not 'int'{code}
> I found that this error is invoked because PyFlink deals with object arrays differently on Java side and Python side. 
>  
> On Java side (org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject)
> {code:java}
> ...
> else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof PrimitiveArrayTypeInfo) {
> # recursively deal with array elements
> } ...
> else {
> # ObjectArrayTypeInfo is here
> TypeSerializer serializer = dataType.createSerializer(null); ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); DataOutputViewStreamWrapper baosWrapper = new DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); return pickler.dumps(baos.toByteArray());
> }
> {code}
>  
> On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter)
> {code:java}
> ...
> elif isinstance(field_type,
> (BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)):
>   element_type = field_type._element_type
>   elements = []
>   for element_bytes in data:
>     elements.append(pickled_bytes_to_python_converter(element_bytes, element_type))
>   return elements{code}
>  
> Thus a possible fix for this bug is to align PyFlink's behavior on Java side and Python side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)