You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/08/08 09:18:04 UTC

[flink] branch master updated: [FLINK-28827][python] Complete DataType support in DataStream API (#20460)

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c40dc15fbc [FLINK-28827][python] Complete DataType support in DataStream API (#20460)
1c40dc15fbc is described below

commit 1c40dc15fbc29e8e5e514565109a3fb05b47a83f
Author: Juntao Hu <ma...@gmail.com>
AuthorDate: Mon Aug 8 17:17:57 2022 +0800

    [FLINK-28827][python] Complete DataType support in DataStream API (#20460)
---
 flink-python/pyflink/common/types.py               |  33 +---
 .../datastream/connectors/tests/test_kafka.py      |  10 +-
 .../pyflink/fn_execution/coder_impl_fast.pxd       |  15 ++
 .../pyflink/fn_execution/coder_impl_fast.pyx       |  77 ++++++++
 .../pyflink/fn_execution/coder_impl_slow.py        |  80 ++++++++
 flink-python/pyflink/fn_execution/coders.py        |  27 ++-
 .../pyflink/fn_execution/flink_fn_execution_pb2.py | 110 ++++++-----
 .../pyflink/proto/flink-fn-execution.proto         |   4 +
 flink-python/pyflink/testing/test_case_utils.py    |  65 +++++-
 .../flink/streaming/api/utils/PythonTypeUtils.java | 219 ++++++++++++++++-----
 10 files changed, 509 insertions(+), 131 deletions(-)

diff --git a/flink-python/pyflink/common/types.py b/flink-python/pyflink/common/types.py
index be07e68f4cd..eeb68272402 100644
--- a/flink-python/pyflink/common/types.py
+++ b/flink-python/pyflink/common/types.py
@@ -15,15 +15,13 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-
 from enum import Enum
-
 from typing import List
 
-__all__ = ['Row', 'RowKind']
-
 from pyflink.java_gateway import get_gateway
 
+__all__ = ['Row', 'RowKind']
+
 
 class RowKind(Enum):
     INSERT = 0
@@ -283,30 +281,3 @@ class Row(object):
 
     def __len__(self):
         return len(self._values)
-
-
-def to_java_data_structure(value):
-    jvm = get_gateway().jvm
-    if isinstance(value, (int, float, str)):
-        return value
-    elif isinstance(value, (list, tuple)):
-        j_list = jvm.java.util.ArrayList()
-        for item in value:
-            j_list.add(to_java_data_structure(item))
-        return j_list
-    elif isinstance(value, map):
-        j_map = jvm.java.util.HashMap()
-        for k, v in value:
-            j_map.put(to_java_data_structure(k), to_java_data_structure(v))
-        return j_map
-    elif isinstance(value, Row):
-        j_row = jvm.org.apache.flink.types.Row(value.get_row_kind().to_j_row_kind(), len(value))
-        if hasattr(value, '_fields'):
-            for field_name, value in zip(value._fields, value._values):
-                j_row.setField(field_name, to_java_data_structure(value))
-        else:
-            for idx, value in enumerate(value._values):
-                j_row.setField(idx, to_java_data_structure(value))
-        return j_row
-    else:
-        raise TypeError('value must be a vanilla Python object')
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
index 725932ed7d5..b36e22092bf 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
@@ -26,15 +26,19 @@ from pyflink.common.serialization import SimpleStringSchema, DeserializationSche
     JsonRowDeserializationSchema, CsvRowDeserializationSchema, AvroRowDeserializationSchema, \
     JsonRowSerializationSchema, CsvRowSerializationSchema, AvroRowSerializationSchema
 from pyflink.common.typeinfo import Types
-from pyflink.common.types import Row, to_java_data_structure
+from pyflink.common.types import Row
 from pyflink.common.watermark_strategy import WatermarkStrategy
 from pyflink.datastream.connectors.base import DeliveryGuarantee
 from pyflink.datastream.connectors.kafka import KafkaSource, KafkaTopicPartition, \
     KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink, \
     FlinkKafkaProducer, FlinkKafkaConsumer
 from pyflink.java_gateway import get_gateway
-from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase, PyFlinkTestCase, \
-    invoke_java_object_method
+from pyflink.testing.test_case_utils import (
+    PyFlinkStreamingTestCase,
+    PyFlinkTestCase,
+    invoke_java_object_method,
+    to_java_data_structure,
+)
 from pyflink.util.java_utils import to_jarray, is_instance_of, get_field_value
 
 
diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pxd b/flink-python/pyflink/fn_execution/coder_impl_fast.pxd
index 4449fe1396b..e66a0da6d79 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_fast.pxd
+++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pxd
@@ -192,3 +192,18 @@ cdef class AvroCoderImpl(FieldCoderImpl):
     cdef object _encoder
     cdef object _reader
     cdef object _writer
+
+cdef class LocalDateCoderImpl(FieldCoderImpl):
+    @staticmethod
+    cdef _encode_to_stream(value, OutputStream out_stream)
+    @staticmethod
+    cdef _decode_from_stream(InputStream in_stream)
+
+cdef class LocalTimeCoderImpl(FieldCoderImpl):
+    @staticmethod
+    cdef _encode_to_stream(value, OutputStream out_stream)
+    @staticmethod
+    cdef _decode_from_stream(InputStream in_stream)
+
+cdef class LocalDateTimeCoderImpl(FieldCoderImpl):
+    pass
diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
index b7629e94209..00ba1fd42c7 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
+++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
@@ -963,3 +963,80 @@ cdef class AvroCoderImpl(FieldCoderImpl):
     cpdef decode_from_stream(self, InputStream in_stream, size_t size):
         self._buffer_wrapper.switch_stream(in_stream)
         return self._reader.read(self._decoder)
+
+cdef class LocalDateCoderImpl(FieldCoderImpl):
+
+    @staticmethod
+    cdef _encode_to_stream(value, OutputStream out_stream):
+        if value is None:
+            out_stream.write_int32(0xFFFFFFFF)
+            out_stream.write_int16(0xFFFF)
+        else:
+            out_stream.write_int32(value.year)
+            out_stream.write_int8(value.month)
+            out_stream.write_int8(value.day)
+
+    @staticmethod
+    cdef _decode_from_stream(InputStream in_stream):
+        year = in_stream.read_int32()
+        if year == 0xFFFFFFFF:
+            in_stream.read(2)
+            return None
+        month = in_stream.read_int8()
+        day = in_stream.read_int8()
+        return datetime.date(year, month, day)
+
+    cpdef encode_to_stream(self, value, OutputStream out_stream):
+        return LocalDateCoderImpl._encode_to_stream(value, out_stream)
+
+    cpdef decode_from_stream(self, InputStream in_stream, size_t length):
+        return LocalDateCoderImpl._decode_from_stream(in_stream)
+
+cdef class LocalTimeCoderImpl(FieldCoderImpl):
+
+    @staticmethod
+    cdef _encode_to_stream(value, OutputStream out_stream):
+        if value is None:
+            out_stream.write_int8(0xFF)
+            out_stream.write_int16(0xFFFF)
+            out_stream.write_int32(0xFFFFFFFF)
+        else:
+            out_stream.write_int8(value.hour)
+            out_stream.write_int8(value.minute)
+            out_stream.write_int8(value.second)
+            out_stream.write_int32(value.microsecond * 1000)
+
+    @staticmethod
+    cdef _decode_from_stream(InputStream in_stream):
+        hour = in_stream.read_int8()
+        if hour == 0xFF:
+            in_stream.read(6)
+            return None
+        minute = in_stream.read_int8()
+        second = in_stream.read_int8()
+        nano = in_stream.read_int32()
+        return datetime.time(hour, minute, second, nano // 1000)
+
+    cpdef encode_to_stream(self, value, OutputStream out_stream):
+        return LocalTimeCoderImpl._encode_to_stream(value, out_stream)
+
+    cpdef decode_from_stream(self, InputStream in_stream, size_t length):
+        return LocalTimeCoderImpl._decode_from_stream(in_stream)
+
+cdef class LocalDateTimeCoderImpl(FieldCoderImpl):
+
+    cpdef encode_to_stream(self, value, OutputStream out_stream):
+        if value is None:
+            LocalDateCoderImpl._encode_to_stream(None, out_stream)
+            LocalTimeCoderImpl._encode_to_stream(None, out_stream)
+        else:
+            LocalDateCoderImpl._encode_to_stream(value.date(), out_stream)
+            LocalTimeCoderImpl._encode_to_stream(value.time(), out_stream)
+
+    cpdef decode_from_stream(self, InputStream in_stream, size_t length):
+        date = LocalDateCoderImpl._decode_from_stream(in_stream)
+        time = LocalTimeCoderImpl._decode_from_stream(in_stream)
+        if date is None or time is None:
+            return None
+        return datetime.datetime(date.year, date.month, date.day, time.hour, time.minute,
+                                 time.second, time.microsecond)
diff --git a/flink-python/pyflink/fn_execution/coder_impl_slow.py b/flink-python/pyflink/fn_execution/coder_impl_slow.py
index 8044d0b5299..23af3483e0a 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_slow.py
+++ b/flink-python/pyflink/fn_execution/coder_impl_slow.py
@@ -857,3 +857,83 @@ class AvroCoderImpl(FieldCoderImpl):
         # Since writer_schema equals reader_schema, in_stream does not need to support seek and tell
         self._buffer_wrapper.switch_stream(in_stream)
         return self._reader.read(self._decoder)
+
+
+class LocalDateCoderImpl(FieldCoderImpl):
+
+    @staticmethod
+    def _encode_to_stream(value: datetime.date, out_stream: OutputStream):
+        if value is None:
+            out_stream.write_int32(0xFFFFFFFF)
+            out_stream.write_int16(0xFFFF)
+        else:
+            out_stream.write_int32(value.year)
+            out_stream.write_int8(value.month)
+            out_stream.write_int8(value.day)
+
+    @staticmethod
+    def _decode_from_stream(in_stream: InputStream):
+        year = in_stream.read_int32()
+        if year == 0xFFFFFFFF:
+            in_stream.read(2)
+            return None
+        month = in_stream.read_int8()
+        day = in_stream.read_int8()
+        return datetime.date(year, month, day)
+
+    def encode_to_stream(self, value: datetime.date, out_stream: OutputStream):
+        self._encode_to_stream(value, out_stream)
+
+    def decode_from_stream(self, in_stream: InputStream, length: int = 0):
+        return self._decode_from_stream(in_stream)
+
+
+class LocalTimeCoderImpl(FieldCoderImpl):
+
+    @staticmethod
+    def _encode_to_stream(value: datetime.time, out_stream: OutputStream):
+        if value is None:
+            out_stream.write_int8(0xFF)
+            out_stream.write_int16(0xFFFF)
+            out_stream.write_int32(0xFFFFFFFF)
+        else:
+            out_stream.write_int8(value.hour)
+            out_stream.write_int8(value.minute)
+            out_stream.write_int8(value.second)
+            out_stream.write_int32(value.microsecond * 1000)
+
+    @staticmethod
+    def _decode_from_stream(in_stream: InputStream):
+        hour = in_stream.read_int8()
+        if hour == 0xFF:
+            in_stream.read(6)
+            return None
+        minute = in_stream.read_int8()
+        second = in_stream.read_int8()
+        nano = in_stream.read_int32()
+        return datetime.time(hour, minute, second, nano // 1000)
+
+    def encode_to_stream(self, value: datetime.time, out_stream: OutputStream):
+        self._encode_to_stream(value, out_stream)
+
+    def decode_from_stream(self, in_stream: InputStream, length: int = 0):
+        return self._decode_from_stream(in_stream)
+
+
+class LocalDateTimeCoderImpl(FieldCoderImpl):
+
+    def encode_to_stream(self, value: datetime.datetime, out_stream: OutputStream):
+        if value is None:
+            LocalDateCoderImpl._encode_to_stream(None, out_stream)
+            LocalTimeCoderImpl._encode_to_stream(None, out_stream)
+        else:
+            LocalDateCoderImpl._encode_to_stream(value.date(), out_stream)
+            LocalTimeCoderImpl._encode_to_stream(value.time(), out_stream)
+
+    def decode_from_stream(self, in_stream: InputStream, length: int = 0):
+        date = LocalDateCoderImpl._decode_from_stream(in_stream)
+        time = LocalTimeCoderImpl._decode_from_stream(in_stream)
+        if date is None or time is None:
+            return None
+        return datetime.datetime(date.year, date.month, date.day, time.hour, time.minute,
+                                 time.second, time.microsecond)
diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py
index 20ef6f2c123..de1c27777d3 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -622,6 +622,24 @@ class AvroCoder(FieldCoder):
         return coder_impl.AvroCoderImpl(self._schema_string)
 
 
+class LocalDateCoder(FieldCoder):
+
+    def get_impl(self):
+        return coder_impl.LocalDateCoderImpl()
+
+
+class LocalTimeCoder(FieldCoder):
+
+    def get_impl(self):
+        return coder_impl.LocalTimeCoderImpl()
+
+
+class LocalDateTimeCoder(FieldCoder):
+
+    def get_impl(self):
+        return coder_impl.LocalDateTimeCoderImpl()
+
+
 def from_proto(field_type):
     """
     Creates the corresponding :class:`Coder` given the protocol representation of the field type.
@@ -693,7 +711,10 @@ def from_type_info_proto(type_info):
         type_info_name.SQL_TIME: TimeCoder(),
         type_info_name.SQL_TIMESTAMP: TimestampCoder(3),
         type_info_name.PICKLED_BYTES: CloudPickleCoder(),
-        type_info_name.INSTANT: InstantCoder()
+        type_info_name.INSTANT: InstantCoder(),
+        type_info_name.LOCAL_DATE: LocalDateCoder(),
+        type_info_name.LOCAL_TIME: LocalTimeCoder(),
+        type_info_name.LOCAL_DATETIME: LocalDateTimeCoder(),
     }
 
     field_type_name = type_info.type_name
@@ -720,6 +741,10 @@ def from_type_info_proto(type_info):
                             from_type_info_proto(type_info.map_type_info.value_type))
         elif field_type_name == type_info_name.AVRO:
             return AvroCoder(type_info.avro_type_info.schema)
+        elif field_type_name == type_info_name.LOCAL_ZONED_TIMESTAMP:
+            return LocalZonedTimestampCoder(
+                3, timezone=pytz.timezone(os.environ['TABLE_LOCAL_TIME_ZONE'])
+            )
         else:
             raise ValueError("Unsupported type_info %s." % type_info)
 
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index d5585eb11a1..d15027d146d 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='flink-fn-execution.proto',
   package='org.apache.flink.fn_execution.v1',
   syntax='proto3',
-  serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
+  serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
 )
 
 
@@ -343,11 +343,27 @@ _TYPEINFO_TYPENAME = _descriptor.EnumDescriptor(
       name='AVRO', index=23, number=23,
       options=None,
       type=None),
+    _descriptor.EnumValueDescriptor(
+      name='LOCAL_DATE', index=24, number=24,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='LOCAL_TIME', index=25, number=25,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='LOCAL_DATETIME', index=26, number=26,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='LOCAL_ZONED_TIMESTAMP', index=27, number=27,
+      options=None,
+      type=None),
   ],
   containing_type=None,
   options=None,
   serialized_start=5775,
-  serialized_end=6093,
+  serialized_end=6172,
 )
 _sym_db.RegisterEnumDescriptor(_TYPEINFO_TYPENAME)
 
@@ -392,8 +408,8 @@ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=6987,
-  serialized_end=7160,
+  serialized_start=7066,
+  serialized_end=7239,
 )
 _sym_db.RegisterEnumDescriptor(_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE)
 
@@ -418,8 +434,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES = _descriptor.EnumD
   ],
   containing_type=None,
   options=None,
-  serialized_start=8722,
-  serialized_end=8820,
+  serialized_start=8801,
+  serialized_end=8899,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES)
 
@@ -436,8 +452,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY = _descri
   ],
   containing_type=None,
   options=None,
-  serialized_start=8822,
-  serialized_end=8864,
+  serialized_start=8901,
+  serialized_end=8943,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY)
 
@@ -462,8 +478,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=8866,
-  serialized_end=8934,
+  serialized_start=8945,
+  serialized_end=9013,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE)
 
@@ -484,8 +500,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=8936,
-  serialized_end=9010,
+  serialized_start=9015,
+  serialized_end=9089,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY)
 
@@ -502,8 +518,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC = _descriptor.EnumDescript
   ],
   containing_type=None,
   options=None,
-  serialized_start=9012,
-  serialized_end=9055,
+  serialized_start=9091,
+  serialized_end=9134,
 )
 _sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC)
 
@@ -524,8 +540,8 @@ _CODERINFODESCRIPTOR_MODE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=10022,
-  serialized_end=10054,
+  serialized_start=10101,
+  serialized_end=10133,
 )
 _sym_db.RegisterEnumDescriptor(_CODERINFODESCRIPTOR_MODE)
 
@@ -1877,7 +1893,7 @@ _TYPEINFO = _descriptor.Descriptor(
       index=0, containing_type=None, fields=[]),
   ],
   serialized_start=4838,
-  serialized_end=6106,
+  serialized_end=6185,
 )
 
 
@@ -1914,8 +1930,8 @@ _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=6603,
-  serialized_end=6645,
+  serialized_start=6682,
+  serialized_end=6724,
 )
 
 _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor(
@@ -1993,8 +2009,8 @@ _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=6648,
-  serialized_end=6984,
+  serialized_start=6727,
+  serialized_end=7063,
 )
 
 _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
@@ -2087,8 +2103,8 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=6109,
-  serialized_end=7160,
+  serialized_start=6188,
+  serialized_end=7239,
 )
 
 
@@ -2125,8 +2141,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY = _
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7944,
-  serialized_end=8032,
+  serialized_start=8023,
+  serialized_end=8111,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY = _descriptor.Descriptor(
@@ -2155,8 +2171,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTR
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=8034,
-  serialized_end=8109,
+  serialized_start=8113,
+  serialized_end=8188,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY = _descriptor.Descriptor(
@@ -2209,8 +2225,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY = _descript
       name='CleanupStrategy', full_name='org.apache.flink.fn_execution.v1.StateDescriptor.StateTTLConfig.CleanupStrategies.MapStrategiesEntry.CleanupStrategy',
       index=0, containing_type=None, fields=[]),
   ],
-  serialized_start=8112,
-  serialized_end=8720,
+  serialized_start=8191,
+  serialized_end=8799,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES = _descriptor.Descriptor(
@@ -2248,8 +2264,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7766,
-  serialized_end=8864,
+  serialized_start=7845,
+  serialized_end=8943,
 )
 
 _STATEDESCRIPTOR_STATETTLCONFIG = _descriptor.Descriptor(
@@ -2309,8 +2325,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7295,
-  serialized_end=9055,
+  serialized_start=7374,
+  serialized_end=9134,
 )
 
 _STATEDESCRIPTOR = _descriptor.Descriptor(
@@ -2346,8 +2362,8 @@ _STATEDESCRIPTOR = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7163,
-  serialized_end=9055,
+  serialized_start=7242,
+  serialized_end=9134,
 )
 
 
@@ -2377,8 +2393,8 @@ _CODERINFODESCRIPTOR_FLATTENROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9651,
-  serialized_end=9725,
+  serialized_start=9730,
+  serialized_end=9804,
 )
 
 _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
@@ -2407,8 +2423,8 @@ _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9727,
-  serialized_end=9794,
+  serialized_start=9806,
+  serialized_end=9873,
 )
 
 _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
@@ -2437,8 +2453,8 @@ _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9796,
-  serialized_end=9865,
+  serialized_start=9875,
+  serialized_end=9944,
 )
 
 _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
@@ -2467,8 +2483,8 @@ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9867,
-  serialized_end=9946,
+  serialized_start=9946,
+  serialized_end=10025,
 )
 
 _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
@@ -2497,8 +2513,8 @@ _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=9948,
-  serialized_end=10020,
+  serialized_start=10027,
+  serialized_end=10099,
 )
 
 _CODERINFODESCRIPTOR = _descriptor.Descriptor(
@@ -2573,8 +2589,8 @@ _CODERINFODESCRIPTOR = _descriptor.Descriptor(
       name='data_type', full_name='org.apache.flink.fn_execution.v1.CoderInfoDescriptor.data_type',
       index=0, containing_type=None, fields=[]),
   ],
-  serialized_start=9058,
-  serialized_end=10067,
+  serialized_start=9137,
+  serialized_end=10146,
 )
 
 _INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto b/flink-python/pyflink/proto/flink-fn-execution.proto
index 5014ecb471d..613a7cd888b 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -311,6 +311,10 @@ message TypeInfo {
     OBJECT_ARRAY = 21;
     INSTANT = 22;
     AVRO = 23;
+    LOCAL_DATE = 24;
+    LOCAL_TIME = 25;
+    LOCAL_DATETIME = 26;
+    LOCAL_ZONED_TIMESTAMP = 27;
   }
 
   message MapTypeInfo {
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index a8cbc2a292a..2ddc2b73707 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -15,6 +15,8 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 #################################################################################
+import calendar
+import datetime
 import glob
 import logging
 import os
@@ -22,18 +24,20 @@ import re
 import shutil
 import sys
 import tempfile
+import time
 import unittest
 from abc import abstractmethod
+from decimal import Decimal
 
 from py4j.java_gateway import JavaObject
 
-from pyflink.common import JobExecutionResult
+from pyflink.common import JobExecutionResult, Time, Instant, Row
 from pyflink.datastream.execution_mode import RuntimeExecutionMode
 from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
 from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root
-from pyflink.table.table_environment import TableEnvironment
-from pyflink.table.environment_settings import EnvironmentSettings
 from pyflink.java_gateway import get_gateway
+from pyflink.table.environment_settings import EnvironmentSettings
+from pyflink.table.table_environment import TableEnvironment
 from pyflink.util.java_utils import add_jars_to_context_class_loader, to_jarray
 
 if os.getenv("VERBOSE"):
@@ -277,3 +281,58 @@ class TestEnv(object):
         for item in self.result:
             result[item.f0] = item.f1
         return result
+
+
+def to_java_data_structure(value):
+    jvm = get_gateway().jvm
+    if isinstance(value, (int, float, str, bytes)):
+        return value
+    elif isinstance(value, Decimal):
+        return jvm.java.math.BigDecimal.valueOf(float(value))
+    elif isinstance(value, datetime.datetime):
+        local_date_time = jvm.java.time.LocalDateTime.of(
+            value.year, value.month, value.day, value.hour, value.minute, value.second,
+            value.microsecond * 1000
+        )
+        if value.tzinfo is None:
+            return local_date_time
+        return jvm.java.time.Instant.ofEpochMilli(
+            (
+                calendar.timegm(value.utctimetuple()) +
+                calendar.timegm(time.localtime(0))
+            ) * 1000 +
+            value.microsecond // 1000
+        )
+    elif isinstance(value, datetime.date):
+        return jvm.java.time.LocalDate.of(value.year, value.month, value.day)
+    elif isinstance(value, datetime.time):
+        return jvm.java.time.LocalTime.of(value.hour, value.minute, value.second,
+                                          value.microsecond * 1000)
+    elif isinstance(value, Time):
+        return jvm.java.time.LocalTime.of()
+    elif isinstance(value, Instant):
+        return jvm.java.time.Instant.ofEpochMilli(value.to_epoch_milli())
+    elif isinstance(value, (list, tuple)):
+        j_list = jvm.java.util.ArrayList()
+        for i in value:
+            j_list.add(to_java_data_structure(i))
+        return j_list
+    elif isinstance(value, dict):
+        j_map = jvm.java.util.HashMap()
+        for k, v in value.items():
+            j_map.put(to_java_data_structure(k), to_java_data_structure(v))
+        return j_map
+    elif isinstance(value, Row):
+        if hasattr(value, '_fields'):
+            j_row = jvm.org.apache.flink.types.Row.withNames(value.get_row_kind().to_j_row_kind())
+            for field_name, value in zip(value._fields, value._values):
+                j_row.setField(field_name, to_java_data_structure(value))
+        else:
+            j_row = jvm.org.apache.flink.types.Row.withPositions(
+                value.get_row_kind().to_j_row_kind(), len(value)
+            )
+            for idx, value in enumerate(value._values):
+                j_row.setField(idx, to_java_data_structure(value))
+        return j_row
+    else:
+        raise TypeError('unsupported value type {}'.format(str(type(value))))
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
index 1bc1317b66e..2cde298ce55 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -38,6 +39,9 @@ import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 import org.apache.flink.api.common.typeutils.base.InstantSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.common.typeutils.base.ShortSerializer;
@@ -69,10 +73,10 @@ import org.apache.flink.table.runtime.typeutils.serializers.python.DateSerialize
 import org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.TimeSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.TimestampSerializer;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
-import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
@@ -183,6 +187,10 @@ public class PythonTypeUtils {
                 return buildSqlTimeTypeProto((SqlTimeTypeInfo<?>) typeInformation);
             }
 
+            if (typeInformation instanceof LocalTimeTypeInfo) {
+                return buildLocalTimeTypeProto((LocalTimeTypeInfo<?>) typeInformation);
+            }
+
             if (typeInformation instanceof RowTypeInfo) {
                 return buildRowTypeProto((RowTypeInfo) typeInformation, userClassLoader);
             }
@@ -218,12 +226,8 @@ public class PythonTypeUtils {
             }
 
             if (typeInformation instanceof InternalTypeInfo) {
-                LogicalTypeRoot logicalTypeRoot =
-                        ((InternalTypeInfo<?>) typeInformation).toLogicalType().getTypeRoot();
-                if (logicalTypeRoot.equals(LogicalTypeRoot.ROW)) {
-                    return buildRowDataTypeProto(
-                            (InternalTypeInfo<?>) typeInformation, userClassLoader);
-                }
+                return buildInternalTypeProto(
+                        (InternalTypeInfo<?>) typeInformation, userClassLoader);
             }
 
             if (typeInformation
@@ -270,6 +274,12 @@ public class PythonTypeUtils {
             return FlinkFnApi.TypeInfo.newBuilder().setTypeName(typeName).build();
         }
 
+        private static FlinkFnApi.TypeInfo buildLocalTimeTypeProto(
+                LocalTimeTypeInfo<?> localTimeTypeInfo) {
+            FlinkFnApi.TypeInfo.TypeName typeName = getTypeName(localTimeTypeInfo);
+            return FlinkFnApi.TypeInfo.newBuilder().setTypeName(typeName).build();
+        }
+
         private static FlinkFnApi.TypeInfo buildPrimitiveArrayTypeProto(
                 PrimitiveArrayTypeInfo<?> primitiveArrayTypeInfo, ClassLoader userClassLoader) {
             FlinkFnApi.TypeInfo elementFieldType =
@@ -336,34 +346,6 @@ public class PythonTypeUtils {
                     .build();
         }
 
-        private static FlinkFnApi.TypeInfo buildRowDataTypeProto(
-                InternalTypeInfo<?> internalTypeInfo, ClassLoader userClassLoader) {
-            RowType rowType = internalTypeInfo.toRowType();
-            FlinkFnApi.TypeInfo.RowTypeInfo.Builder rowTypeInfoBuilder =
-                    FlinkFnApi.TypeInfo.RowTypeInfo.newBuilder();
-
-            int arity = rowType.getFieldCount();
-            for (int index = 0; index < arity; index++) {
-                rowTypeInfoBuilder.addFields(
-                        FlinkFnApi.TypeInfo.RowTypeInfo.Field.newBuilder()
-                                .setFieldName(rowType.getFieldNames().get(index))
-                                .setFieldType(
-                                        toTypeInfoProto(
-                                                ExternalTypeInfo.of(
-                                                        TypeConversions.fromLogicalToDataType(
-                                                                rowType.getFields()
-                                                                        .get(index)
-                                                                        .getType())),
-                                                userClassLoader))
-                                .build());
-            }
-
-            return FlinkFnApi.TypeInfo.newBuilder()
-                    .setTypeName(FlinkFnApi.TypeInfo.TypeName.ROW)
-                    .setRowTypeInfo(rowTypeInfoBuilder.build())
-                    .build();
-        }
-
         private static FlinkFnApi.TypeInfo buildTupleTypeProto(
                 TupleTypeInfo<?> tupleTypeInfo, ClassLoader userClassLoader) {
             FlinkFnApi.TypeInfo.TupleTypeInfo.Builder tupleTypeInfoBuilder =
@@ -422,6 +404,138 @@ public class PythonTypeUtils {
                     .build();
         }
 
+        private static FlinkFnApi.TypeInfo buildTableBinaryTypeProto() {
+            return FlinkFnApi.TypeInfo.newBuilder()
+                    .setTypeName(FlinkFnApi.TypeInfo.TypeName.PRIMITIVE_ARRAY)
+                    .setCollectionElementType(
+                            FlinkFnApi.TypeInfo.newBuilder()
+                                    .setTypeName(FlinkFnApi.TypeInfo.TypeName.BYTE)
+                                    .build())
+                    .build();
+        }
+
+        private static FlinkFnApi.TypeInfo buildTableArrayTypeProto(
+                InternalTypeInfo<?> internalTypeInfo, ClassLoader userClassLoader) {
+            ArrayType arrayType = (ArrayType) internalTypeInfo.toLogicalType();
+            return FlinkFnApi.TypeInfo.newBuilder()
+                    .setTypeName(FlinkFnApi.TypeInfo.TypeName.LIST)
+                    .setCollectionElementType(
+                            toTypeInfoProto(
+                                    InternalTypeInfo.of(arrayType.getElementType()),
+                                    userClassLoader))
+                    .build();
+        }
+
+        private static FlinkFnApi.TypeInfo buildTableMapTypeProto(
+                InternalTypeInfo<?> internalTypeInfo, ClassLoader userClassLoader) {
+            MapType mapType = (MapType) internalTypeInfo.toLogicalType();
+            FlinkFnApi.TypeInfo.MapTypeInfo.Builder mapTypeInfoBuilder =
+                    FlinkFnApi.TypeInfo.MapTypeInfo.newBuilder();
+            mapTypeInfoBuilder
+                    .setKeyType(
+                            toTypeInfoProto(
+                                    InternalTypeInfo.of(mapType.getKeyType()), userClassLoader))
+                    .setValueType(
+                            toTypeInfoProto(
+                                    InternalTypeInfo.of(mapType.getValueType()), userClassLoader));
+            return FlinkFnApi.TypeInfo.newBuilder()
+                    .setTypeName(FlinkFnApi.TypeInfo.TypeName.MAP)
+                    .setMapTypeInfo(mapTypeInfoBuilder.build())
+                    .build();
+        }
+
+        private static FlinkFnApi.TypeInfo buildTableRowTypeProto(
+                InternalTypeInfo<?> internalTypeInfo, ClassLoader userClassLoader) {
+            RowType rowType = internalTypeInfo.toRowType();
+            FlinkFnApi.TypeInfo.RowTypeInfo.Builder rowTypeInfoBuilder =
+                    FlinkFnApi.TypeInfo.RowTypeInfo.newBuilder();
+
+            int arity = rowType.getFieldCount();
+            for (int index = 0; index < arity; index++) {
+                rowTypeInfoBuilder.addFields(
+                        FlinkFnApi.TypeInfo.RowTypeInfo.Field.newBuilder()
+                                .setFieldName(rowType.getFieldNames().get(index))
+                                .setFieldType(
+                                        toTypeInfoProto(
+                                                InternalTypeInfo.of(
+                                                        rowType.getFields().get(index).getType()),
+                                                userClassLoader))
+                                .build());
+            }
+
+            return FlinkFnApi.TypeInfo.newBuilder()
+                    .setTypeName(FlinkFnApi.TypeInfo.TypeName.ROW)
+                    .setRowTypeInfo(rowTypeInfoBuilder.build())
+                    .build();
+        }
+
+        private static FlinkFnApi.TypeInfo buildInternalTypeProto(
+                InternalTypeInfo<?> internalTypeInfo, ClassLoader userClassLoader)
+                throws UnsupportedOperationException {
+            FlinkFnApi.TypeInfo.TypeName typeName;
+            switch (internalTypeInfo.toLogicalType().getTypeRoot()) {
+                case CHAR:
+                case VARCHAR:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.STRING;
+                    break;
+                case BOOLEAN:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.BOOLEAN;
+                    break;
+                case BINARY:
+                case VARBINARY:
+                    return buildTableBinaryTypeProto();
+                case DECIMAL:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.BIG_DEC;
+                    break;
+                case TINYINT:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.BYTE;
+                    break;
+                case SMALLINT:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.SHORT;
+                    break;
+                case INTEGER:
+                case INTERVAL_YEAR_MONTH:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.INT;
+                    break;
+                case BIGINT:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.BIG_INT;
+                    break;
+                case INTERVAL_DAY_TIME:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.LONG;
+                    break;
+                case FLOAT:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.FLOAT;
+                    break;
+                case DOUBLE:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.DOUBLE;
+                    break;
+                case DATE:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.SQL_DATE;
+                    break;
+                case TIME_WITHOUT_TIME_ZONE:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.SQL_TIME;
+                    break;
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.SQL_TIMESTAMP;
+                    break;
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                    typeName = FlinkFnApi.TypeInfo.TypeName.LOCAL_ZONED_TIMESTAMP;
+                    break;
+                case ARRAY:
+                    return buildTableArrayTypeProto(internalTypeInfo, userClassLoader);
+                case MAP:
+                    return buildTableMapTypeProto(internalTypeInfo, userClassLoader);
+                case ROW:
+                    return buildTableRowTypeProto(internalTypeInfo, userClassLoader);
+                default:
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "InternalTypeInfo %s is still not supported in PyFlink",
+                                    internalTypeInfo));
+            }
+            return FlinkFnApi.TypeInfo.newBuilder().setTypeName(typeName).build();
+        }
+
         private static FlinkFnApi.TypeInfo.TypeName getTypeName(TypeInformation<?> typeInfo) {
             if (typeInfo.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
                 return FlinkFnApi.TypeInfo.TypeName.BOOLEAN;
@@ -515,6 +629,18 @@ public class PythonTypeUtils {
                 return FlinkFnApi.TypeInfo.TypeName.SQL_TIMESTAMP;
             }
 
+            if (typeInfo.equals(LocalTimeTypeInfo.LOCAL_DATE)) {
+                return FlinkFnApi.TypeInfo.TypeName.LOCAL_DATE;
+            }
+
+            if (typeInfo.equals(LocalTimeTypeInfo.LOCAL_TIME)) {
+                return FlinkFnApi.TypeInfo.TypeName.LOCAL_TIME;
+            }
+
+            if (typeInfo.equals(LocalTimeTypeInfo.LOCAL_DATE_TIME)) {
+                return FlinkFnApi.TypeInfo.TypeName.LOCAL_DATETIME;
+            }
+
             throw new UnsupportedOperationException(
                     String.format(
                             "Type %s is still not supported in PyFlink.", typeInfo.toString()));
@@ -582,6 +708,14 @@ public class PythonTypeUtils {
                     SqlTimeTypeInfo.TIME.getTypeClass(), TimeSerializer.INSTANCE);
             typeInfoToSerializerMap.put(
                     SqlTimeTypeInfo.TIMESTAMP.getTypeClass(), new TimestampSerializer(3));
+
+            typeInfoToSerializerMap.put(
+                    LocalTimeTypeInfo.LOCAL_DATE.getTypeClass(), LocalDateSerializer.INSTANCE);
+            typeInfoToSerializerMap.put(
+                    LocalTimeTypeInfo.LOCAL_TIME.getTypeClass(), LocalTimeSerializer.INSTANCE);
+            typeInfoToSerializerMap.put(
+                    LocalTimeTypeInfo.LOCAL_DATE_TIME.getTypeClass(),
+                    LocalDateTimeSerializer.INSTANCE);
         }
 
         @SuppressWarnings("unchecked")
@@ -670,16 +804,9 @@ public class PythonTypeUtils {
                 }
 
                 if (typeInformation instanceof InternalTypeInfo) {
-                    LogicalTypeRoot logicalTypeRoot =
-                            ((InternalTypeInfo<?>) typeInformation)
-                                    .getDataType()
-                                    .getLogicalType()
-                                    .getTypeRoot();
-                    if (logicalTypeRoot.equals(LogicalTypeRoot.ROW)) {
-                        return org.apache.flink.table.runtime.typeutils.PythonTypeUtils
-                                .toInternalSerializer(
-                                        ((InternalTypeInfo<?>) typeInformation).toRowType());
-                    }
+                    InternalTypeInfo<?> internalTypeInfo = (InternalTypeInfo<?>) typeInformation;
+                    return org.apache.flink.table.runtime.typeutils.PythonTypeUtils
+                            .toInternalSerializer(internalTypeInfo.toLogicalType());
                 }
 
                 if (typeInformation