You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yunfeng Zhou (Jira)" <ji...@apache.org> on 2022/11/23 11:53:00 UTC

[jira] [Created] (FLINK-30168) PyFlink Deserialization Error with Object Array

Yunfeng Zhou created FLINK-30168:
------------------------------------

             Summary: PyFlink Deserialization Error with Object Array
                 Key: FLINK-30168
                 URL: https://issues.apache.org/jira/browse/FLINK-30168
             Project: Flink
          Issue Type: Improvement
          Components: API / Python
    Affects Versions: 1.15.2, 1.16.0
            Reporter: Yunfeng Zhou


When it is attempted to collect object array records from a DataStream in PyFlink, an exception like follows would be thrown
data = 0, field_type = DenseVectorTypeInfo
def pickled_bytes_to_python_converter(data, field_type):if isinstance(field_type, RowTypeInfo):
row_kind = RowKind(int.from_bytes(data[0], 'little'))
data = zip(list(data[1:]), field_type.get_field_types())
fields = []for d, d_type in data:
fields.append(pickled_bytes_to_python_converter(d, d_type))
row = Row.of_kind(row_kind, *fields)return rowelse:
> data = pickle.loads(data)
E TypeError: a bytes-like object is required, not 'int'
I found that this error is invoked because PyFlink deals with object arrays differently on Java side and Python side. 

 

On Java side (org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject)

 
{code:java}
...
else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof PrimitiveArrayTypeInfo) {
# recursively deal with array elements
} ...
else {
# ObjectArrayTypeInfo is here
TypeSerializer serializer = dataType.createSerializer(null); ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); DataOutputViewStreamWrapper baosWrapper = new DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); return pickler.dumps(baos.toByteArray());
}
{code}
 

On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter)
{code:java}
...
elif isinstance(field_type,
(BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)):
  element_type = field_type._element_type
  elements = []
  for element_bytes in data:
    elements.append(pickled_bytes_to_python_converter(element_bytes, element_type))
  return elements{code}
 

 

Thus a possible fix for this bug is to align PyFlink's behavior on Java side and Python side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)