You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/08/08 09:18:04 UTC
[flink] branch master updated: [FLINK-28827][python] Complete DataType support in DataStream API (#20460)
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1c40dc15fbc [FLINK-28827][python] Complete DataType support in DataStream API (#20460)
1c40dc15fbc is described below
commit 1c40dc15fbc29e8e5e514565109a3fb05b47a83f
Author: Juntao Hu <ma...@gmail.com>
AuthorDate: Mon Aug 8 17:17:57 2022 +0800
[FLINK-28827][python] Complete DataType support in DataStream API (#20460)
---
flink-python/pyflink/common/types.py | 33 +---
.../datastream/connectors/tests/test_kafka.py | 10 +-
.../pyflink/fn_execution/coder_impl_fast.pxd | 15 ++
.../pyflink/fn_execution/coder_impl_fast.pyx | 77 ++++++++
.../pyflink/fn_execution/coder_impl_slow.py | 80 ++++++++
flink-python/pyflink/fn_execution/coders.py | 27 ++-
.../pyflink/fn_execution/flink_fn_execution_pb2.py | 110 ++++++-----
.../pyflink/proto/flink-fn-execution.proto | 4 +
flink-python/pyflink/testing/test_case_utils.py | 65 +++++-
.../flink/streaming/api/utils/PythonTypeUtils.java | 219 ++++++++++++++++-----
10 files changed, 509 insertions(+), 131 deletions(-)
diff --git a/flink-python/pyflink/common/types.py b/flink-python/pyflink/common/types.py
index be07e68f4cd..eeb68272402 100644
--- a/flink-python/pyflink/common/types.py
+++ b/flink-python/pyflink/common/types.py
@@ -15,15 +15,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
from enum import Enum
-
from typing import List
-__all__ = ['Row', 'RowKind']
-
from pyflink.java_gateway import get_gateway
+__all__ = ['Row', 'RowKind']
+
class RowKind(Enum):
INSERT = 0
@@ -283,30 +281,3 @@ class Row(object):
def __len__(self):
return len(self._values)
-
-
-def to_java_data_structure(value):
- jvm = get_gateway().jvm
- if isinstance(value, (int, float, str)):
- return value
- elif isinstance(value, (list, tuple)):
- j_list = jvm.java.util.ArrayList()
- for item in value:
- j_list.add(to_java_data_structure(item))
- return j_list
- elif isinstance(value, map):
- j_map = jvm.java.util.HashMap()
- for k, v in value:
- j_map.put(to_java_data_structure(k), to_java_data_structure(v))
- return j_map
- elif isinstance(value, Row):
- j_row = jvm.org.apache.flink.types.Row(value.get_row_kind().to_j_row_kind(), len(value))
- if hasattr(value, '_fields'):
- for field_name, value in zip(value._fields, value._values):
- j_row.setField(field_name, to_java_data_structure(value))
- else:
- for idx, value in enumerate(value._values):
- j_row.setField(idx, to_java_data_structure(value))
- return j_row
- else:
- raise TypeError('value must be a vanilla Python object')
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
index 725932ed7d5..b36e22092bf 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
@@ -26,15 +26,19 @@ from pyflink.common.serialization import SimpleStringSchema, DeserializationSche
JsonRowDeserializationSchema, CsvRowDeserializationSchema, AvroRowDeserializationSchema, \
JsonRowSerializationSchema, CsvRowSerializationSchema, AvroRowSerializationSchema
from pyflink.common.typeinfo import Types
-from pyflink.common.types import Row, to_java_data_structure
+from pyflink.common.types import Row
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.connectors.base import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaTopicPartition, \
KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink, \
FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.java_gateway import get_gateway
-from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase, PyFlinkTestCase, \
- invoke_java_object_method
+from pyflink.testing.test_case_utils import (
+ PyFlinkStreamingTestCase,
+ PyFlinkTestCase,
+ invoke_java_object_method,
+ to_java_data_structure,
+)
from pyflink.util.java_utils import to_jarray, is_instance_of, get_field_value
diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pxd b/flink-python/pyflink/fn_execution/coder_impl_fast.pxd
index 4449fe1396b..e66a0da6d79 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_fast.pxd
+++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pxd
@@ -192,3 +192,18 @@ cdef class AvroCoderImpl(FieldCoderImpl):
cdef object _encoder
cdef object _reader
cdef object _writer
+
+cdef class LocalDateCoderImpl(FieldCoderImpl):
+ @staticmethod
+ cdef _encode_to_stream(value, OutputStream out_stream)
+ @staticmethod
+ cdef _decode_from_stream(InputStream in_stream)
+
+cdef class LocalTimeCoderImpl(FieldCoderImpl):
+ @staticmethod
+ cdef _encode_to_stream(value, OutputStream out_stream)
+ @staticmethod
+ cdef _decode_from_stream(InputStream in_stream)
+
+cdef class LocalDateTimeCoderImpl(FieldCoderImpl):
+ pass
diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
index b7629e94209..00ba1fd42c7 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
+++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
@@ -963,3 +963,80 @@ cdef class AvroCoderImpl(FieldCoderImpl):
cpdef decode_from_stream(self, InputStream in_stream, size_t size):
self._buffer_wrapper.switch_stream(in_stream)
return self._reader.read(self._decoder)
+
+cdef class LocalDateCoderImpl(FieldCoderImpl):
+
+ @staticmethod
+ cdef _encode_to_stream(value, OutputStream out_stream):
+ if value is None:
+ out_stream.write_int32(0xFFFFFFFF)
+ out_stream.write_int16(0xFFFF)
+ else:
+ out_stream.write_int32(value.year)
+ out_stream.write_int8(value.month)
+ out_stream.write_int8(value.day)
+
+ @staticmethod
+ cdef _decode_from_stream(InputStream in_stream):
+ year = in_stream.read_int32()
+ if year == 0xFFFFFFFF:
+ in_stream.read(2)
+ return None
+ month = in_stream.read_int8()
+ day = in_stream.read_int8()
+ return datetime.date(year, month, day)
+
+ cpdef encode_to_stream(self, value, OutputStream out_stream):
+ return LocalDateCoderImpl._encode_to_stream(value, out_stream)
+
+ cpdef decode_from_stream(self, InputStream in_stream, size_t length):
+ return LocalDateCoderImpl._decode_from_stream(in_stream)
+
+cdef class LocalTimeCoderImpl(FieldCoderImpl):
+
+ @staticmethod
+ cdef _encode_to_stream(value, OutputStream out_stream):
+ if value is None:
+ out_stream.write_int8(0xFF)
+ out_stream.write_int16(0xFFFF)
+ out_stream.write_int32(0xFFFFFFFF)
+ else:
+ out_stream.write_int8(value.hour)
+ out_stream.write_int8(value.minute)
+ out_stream.write_int8(value.second)
+ out_stream.write_int32(value.microsecond * 1000)
+
+ @staticmethod
+ cdef _decode_from_stream(InputStream in_stream):
+ hour = in_stream.read_int8()
+ if hour == 0xFF:
+ in_stream.read(6)
+ return None
+ minute = in_stream.read_int8()
+ second = in_stream.read_int8()
+ nano = in_stream.read_int32()
+ return datetime.time(hour, minute, second, nano // 1000)
+
+ cpdef encode_to_stream(self, value, OutputStream out_stream):
+ return LocalTimeCoderImpl._encode_to_stream(value, out_stream)
+
+ cpdef decode_from_stream(self, InputStream in_stream, size_t length):
+ return LocalTimeCoderImpl._decode_from_stream(in_stream)
+
+cdef class LocalDateTimeCoderImpl(FieldCoderImpl):
+
+ cpdef encode_to_stream(self, value, OutputStream out_stream):
+ if value is None:
+ LocalDateCoderImpl._encode_to_stream(None, out_stream)
+ LocalTimeCoderImpl._encode_to_stream(None, out_stream)
+ else:
+ LocalDateCoderImpl._encode_to_stream(value.date(), out_stream)
+ LocalTimeCoderImpl._encode_to_stream(value.time(), out_stream)
+
+ cpdef decode_from_stream(self, InputStream in_stream, size_t length):
+ date = LocalDateCoderImpl._decode_from_stream(in_stream)
+ time = LocalTimeCoderImpl._decode_from_stream(in_stream)
+ if date is None or time is None:
+ return None
+ return datetime.datetime(date.year, date.month, date.day, time.hour, time.minute,
+ time.second, time.microsecond)
diff --git a/flink-python/pyflink/fn_execution/coder_impl_slow.py b/flink-python/pyflink/fn_execution/coder_impl_slow.py
index 8044d0b5299..23af3483e0a 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_slow.py
+++ b/flink-python/pyflink/fn_execution/coder_impl_slow.py
@@ -857,3 +857,83 @@ class AvroCoderImpl(FieldCoderImpl):
# Since writer_schema equals reader_schema, in_stream does not need to support seek and tell
self._buffer_wrapper.switch_stream(in_stream)
return self._reader.read(self._decoder)
+
+
+class LocalDateCoderImpl(FieldCoderImpl):
+
+ @staticmethod
+ def _encode_to_stream(value: datetime.date, out_stream: OutputStream):
+ if value is None:
+ out_stream.write_int32(0xFFFFFFFF)
+ out_stream.write_int16(0xFFFF)
+ else:
+ out_stream.write_int32(value.year)
+ out_stream.write_int8(value.month)
+ out_stream.write_int8(value.day)
+
+ @staticmethod
+ def _decode_from_stream(in_stream: InputStream):
+ year = in_stream.read_int32()
+ if year == 0xFFFFFFFF:
+ in_stream.read(2)
+ return None
+ month = in_stream.read_int8()
+ day = in_stream.read_int8()
+ return datetime.date(year, month, day)
+
+ def encode_to_stream(self, value: datetime.date, out_stream: OutputStream):
+ self._encode_to_stream(value, out_stream)
+
+ def decode_from_stream(self, in_stream: InputStream, length: int = 0):
+ return self._decode_from_stream(in_stream)
+
+
+class LocalTimeCoderImpl(FieldCoderImpl):
+
+ @staticmethod
+ def _encode_to_stream(value: datetime.time, out_stream: OutputStream):
+ if value is None:
+ out_stream.write_int8(0xFF)
+ out_stream.write_int16(0xFFFF)
+ out_stream.write_int32(0xFFFFFFFF)
+ else:
+ out_stream.write_int8(value.hour)
+ out_stream.write_int8(value.minute)
+ out_stream.write_int8(value.second)
+ out_stream.write_int32(value.microsecond * 1000)
+
+ @staticmethod
+ def _decode_from_stream(in_stream: InputStream):
+ hour = in_stream.read_int8()
+ if hour == 0xFF:
+ in_stream.read(6)
+ return None
+ minute = in_stream.read_int8()
+ second = in_stream.read_int8()
+ nano = in_stream.read_int32()
+ return datetime.time(hour, minute, second, nano // 1000)
+
+ def encode_to_stream(self, value: datetime.time, out_stream: OutputStream):
+ self._encode_to_stream(value, out_stream)
+
+ def decode_from_stream(self, in_stream: InputStream, length: int = 0):
+ return self._decode_from_stream(in_stream)
+
+
+class LocalDateTimeCoderImpl(FieldCoderImpl):
+
+ def encode_to_stream(self, value: datetime.datetime, out_stream: OutputStream):
+ if value is None:
+ LocalDateCoderImpl._encode_to_stream(None, out_stream)
+ LocalTimeCoderImpl._encode_to_stream(None, out_stream)
+ else:
+ LocalDateCoderImpl._encode_to_stream(value.date(), out_stream)
+ LocalTimeCoderImpl._encode_to_stream(value.time(), out_stream)
+
+ def decode_from_stream(self, in_stream: InputStream, length: int = 0):
+ date = LocalDateCoderImpl._decode_from_stream(in_stream)
+ time = LocalTimeCoderImpl._decode_from_stream(in_stream)
+ if date is None or time is None:
+ return None
+ return datetime.datetime(date.year, date.month, date.day, time.hour, time.minute,
+ time.second, time.microsecond)
diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py
index 20ef6f2c123..de1c27777d3 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -622,6 +622,24 @@ class AvroCoder(FieldCoder):
return coder_impl.AvroCoderImpl(self._schema_string)
+class LocalDateCoder(FieldCoder):
+
+ def get_impl(self):
+ return coder_impl.LocalDateCoderImpl()
+
+
+class LocalTimeCoder(FieldCoder):
+
+ def get_impl(self):
+ return coder_impl.LocalTimeCoderImpl()
+
+
+class LocalDateTimeCoder(FieldCoder):
+
+ def get_impl(self):
+ return coder_impl.LocalDateTimeCoderImpl()
+
+
def from_proto(field_type):
"""
Creates the corresponding :class:`Coder` given the protocol representation of the field type.
@@ -693,7 +711,10 @@ def from_type_info_proto(type_info):
type_info_name.SQL_TIME: TimeCoder(),
type_info_name.SQL_TIMESTAMP: TimestampCoder(3),
type_info_name.PICKLED_BYTES: CloudPickleCoder(),
- type_info_name.INSTANT: InstantCoder()
+ type_info_name.INSTANT: InstantCoder(),
+ type_info_name.LOCAL_DATE: LocalDateCoder(),
+ type_info_name.LOCAL_TIME: LocalTimeCoder(),
+ type_info_name.LOCAL_DATETIME: LocalDateTimeCoder(),
}
field_type_name = type_info.type_name
@@ -720,6 +741,10 @@ def from_type_info_proto(type_info):
from_type_info_proto(type_info.map_type_info.value_type))
elif field_type_name == type_info_name.AVRO:
return AvroCoder(type_info.avro_type_info.schema)
+ elif field_type_name == type_info_name.LOCAL_ZONED_TIMESTAMP:
+ return LocalZonedTimestampCoder(
+ 3, timezone=pytz.timezone(os.environ['TABLE_LOCAL_TIME_ZONE'])
+ )
else:
raise ValueError("Unsupported type_info %s." % type_info)
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index d5585eb11a1..d15027d146d 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='flink-fn-execution.proto',
package='org.apache.flink.fn_execution.v1',
syntax='proto3',
- serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
+ serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
)
@@ -343,11 +343,27 @@ _TYPEINFO_TYPENAME = _descriptor.EnumDescriptor(
name='AVRO', index=23, number=23,
options=None,
type=None),
+ _descriptor.EnumValueDescriptor(
+ name='LOCAL_DATE', index=24, number=24,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='LOCAL_TIME', index=25, number=25,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='LOCAL_DATETIME', index=26, number=26,
+ options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='LOCAL_ZONED_TIMESTAMP', index=27, number=27,
+ options=None,
+ type=None),
],
containing_type=None,
options=None,
serialized_start=5775,
- serialized_end=6093,
+ serialized_end=6172,
)
_sym_db.RegisterEnumDescriptor(_TYPEINFO_TYPENAME)
@@ -392,8 +408,8 @@ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=6987,
- serialized_end=7160,
+ serialized_start=7066,
+ serialized_end=7239,
)
_sym_db.RegisterEnumDescriptor(_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE)
@@ -418,8 +434,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES = _descriptor.EnumD
],
containing_type=None,
options=None,
- serialized_start=8722,
- serialized_end=8820,
+ serialized_start=8801,
+ serialized_end=8899,
)
_sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_STRATEGIES)
@@ -436,8 +452,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY = _descri
],
containing_type=None,
options=None,
- serialized_start=8822,
- serialized_end=8864,
+ serialized_start=8901,
+ serialized_end=8943,
)
_sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_EMPTYCLEANUPSTRATEGY)
@@ -462,8 +478,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=8866,
- serialized_end=8934,
+ serialized_start=8945,
+ serialized_end=9013,
)
_sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_UPDATETYPE)
@@ -484,8 +500,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=8936,
- serialized_end=9010,
+ serialized_start=9015,
+ serialized_end=9089,
)
_sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_STATEVISIBILITY)
@@ -502,8 +518,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC = _descriptor.EnumDescript
],
containing_type=None,
options=None,
- serialized_start=9012,
- serialized_end=9055,
+ serialized_start=9091,
+ serialized_end=9134,
)
_sym_db.RegisterEnumDescriptor(_STATEDESCRIPTOR_STATETTLCONFIG_TTLTIMECHARACTERISTIC)
@@ -524,8 +540,8 @@ _CODERINFODESCRIPTOR_MODE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=10022,
- serialized_end=10054,
+ serialized_start=10101,
+ serialized_end=10133,
)
_sym_db.RegisterEnumDescriptor(_CODERINFODESCRIPTOR_MODE)
@@ -1877,7 +1893,7 @@ _TYPEINFO = _descriptor.Descriptor(
index=0, containing_type=None, fields=[]),
],
serialized_start=4838,
- serialized_end=6106,
+ serialized_end=6185,
)
@@ -1914,8 +1930,8 @@ _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=6603,
- serialized_end=6645,
+ serialized_start=6682,
+ serialized_end=6724,
)
_USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor(
@@ -1993,8 +2009,8 @@ _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=6648,
- serialized_end=6984,
+ serialized_start=6727,
+ serialized_end=7063,
)
_USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
@@ -2087,8 +2103,8 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=6109,
- serialized_end=7160,
+ serialized_start=6188,
+ serialized_end=7239,
)
@@ -2125,8 +2141,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_INCREMENTALCLEANUPSTRATEGY = _
extension_ranges=[],
oneofs=[
],
- serialized_start=7944,
- serialized_end=8032,
+ serialized_start=8023,
+ serialized_end=8111,
)
_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTRATEGY = _descriptor.Descriptor(
@@ -2155,8 +2171,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_ROCKSDBCOMPACTFILTERCLEANUPSTR
extension_ranges=[],
oneofs=[
],
- serialized_start=8034,
- serialized_end=8109,
+ serialized_start=8113,
+ serialized_end=8188,
)
_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY = _descriptor.Descriptor(
@@ -2209,8 +2225,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES_MAPSTRATEGIESENTRY = _descript
name='CleanupStrategy', full_name='org.apache.flink.fn_execution.v1.StateDescriptor.StateTTLConfig.CleanupStrategies.MapStrategiesEntry.CleanupStrategy',
index=0, containing_type=None, fields=[]),
],
- serialized_start=8112,
- serialized_end=8720,
+ serialized_start=8191,
+ serialized_end=8799,
)
_STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES = _descriptor.Descriptor(
@@ -2248,8 +2264,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG_CLEANUPSTRATEGIES = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7766,
- serialized_end=8864,
+ serialized_start=7845,
+ serialized_end=8943,
)
_STATEDESCRIPTOR_STATETTLCONFIG = _descriptor.Descriptor(
@@ -2309,8 +2325,8 @@ _STATEDESCRIPTOR_STATETTLCONFIG = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7295,
- serialized_end=9055,
+ serialized_start=7374,
+ serialized_end=9134,
)
_STATEDESCRIPTOR = _descriptor.Descriptor(
@@ -2346,8 +2362,8 @@ _STATEDESCRIPTOR = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7163,
- serialized_end=9055,
+ serialized_start=7242,
+ serialized_end=9134,
)
@@ -2377,8 +2393,8 @@ _CODERINFODESCRIPTOR_FLATTENROWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=9651,
- serialized_end=9725,
+ serialized_start=9730,
+ serialized_end=9804,
)
_CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
@@ -2407,8 +2423,8 @@ _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=9727,
- serialized_end=9794,
+ serialized_start=9806,
+ serialized_end=9873,
)
_CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
@@ -2437,8 +2453,8 @@ _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=9796,
- serialized_end=9865,
+ serialized_start=9875,
+ serialized_end=9944,
)
_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
@@ -2467,8 +2483,8 @@ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=9867,
- serialized_end=9946,
+ serialized_start=9946,
+ serialized_end=10025,
)
_CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
@@ -2497,8 +2513,8 @@ _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=9948,
- serialized_end=10020,
+ serialized_start=10027,
+ serialized_end=10099,
)
_CODERINFODESCRIPTOR = _descriptor.Descriptor(
@@ -2573,8 +2589,8 @@ _CODERINFODESCRIPTOR = _descriptor.Descriptor(
name='data_type', full_name='org.apache.flink.fn_execution.v1.CoderInfoDescriptor.data_type',
index=0, containing_type=None, fields=[]),
],
- serialized_start=9058,
- serialized_end=10067,
+ serialized_start=9137,
+ serialized_end=10146,
)
_INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto b/flink-python/pyflink/proto/flink-fn-execution.proto
index 5014ecb471d..613a7cd888b 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -311,6 +311,10 @@ message TypeInfo {
OBJECT_ARRAY = 21;
INSTANT = 22;
AVRO = 23;
+ LOCAL_DATE = 24;
+ LOCAL_TIME = 25;
+ LOCAL_DATETIME = 26;
+ LOCAL_ZONED_TIMESTAMP = 27;
}
message MapTypeInfo {
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index a8cbc2a292a..2ddc2b73707 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
+import calendar
+import datetime
import glob
import logging
import os
@@ -22,18 +24,20 @@ import re
import shutil
import sys
import tempfile
+import time
import unittest
from abc import abstractmethod
+from decimal import Decimal
from py4j.java_gateway import JavaObject
-from pyflink.common import JobExecutionResult
+from pyflink.common import JobExecutionResult, Time, Instant, Row
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root
-from pyflink.table.table_environment import TableEnvironment
-from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.java_gateway import get_gateway
+from pyflink.table.environment_settings import EnvironmentSettings
+from pyflink.table.table_environment import TableEnvironment
from pyflink.util.java_utils import add_jars_to_context_class_loader, to_jarray
if os.getenv("VERBOSE"):
@@ -277,3 +281,58 @@ class TestEnv(object):
for item in self.result:
result[item.f0] = item.f1
return result
+
+
+def to_java_data_structure(value):
+ jvm = get_gateway().jvm
+ if isinstance(value, (int, float, str, bytes)):
+ return value
+ elif isinstance(value, Decimal):
+ return jvm.java.math.BigDecimal.valueOf(float(value))
+ elif isinstance(value, datetime.datetime):
+ local_date_time = jvm.java.time.LocalDateTime.of(
+ value.year, value.month, value.day, value.hour, value.minute, value.second,
+ value.microsecond * 1000
+ )
+ if value.tzinfo is None:
+ return local_date_time
+ return jvm.java.time.Instant.ofEpochMilli(
+ (
+ calendar.timegm(value.utctimetuple()) +
+ calendar.timegm(time.localtime(0))
+ ) * 1000 +
+ value.microsecond // 1000
+ )
+ elif isinstance(value, datetime.date):
+ return jvm.java.time.LocalDate.of(value.year, value.month, value.day)
+ elif isinstance(value, datetime.time):
+ return jvm.java.time.LocalTime.of(value.hour, value.minute, value.second,
+ value.microsecond * 1000)
+ elif isinstance(value, Time):
+ return jvm.java.time.LocalTime.of()
+ elif isinstance(value, Instant):
+ return jvm.java.time.Instant.ofEpochMilli(value.to_epoch_milli())
+ elif isinstance(value, (list, tuple)):
+ j_list = jvm.java.util.ArrayList()
+ for i in value:
+ j_list.add(to_java_data_structure(i))
+ return j_list
+ elif isinstance(value, dict):
+ j_map = jvm.java.util.HashMap()
+ for k, v in value.items():
+ j_map.put(to_java_data_structure(k), to_java_data_structure(v))
+ return j_map
+ elif isinstance(value, Row):
+ if hasattr(value, '_fields'):
+ j_row = jvm.org.apache.flink.types.Row.withNames(value.get_row_kind().to_j_row_kind())
+ for field_name, value in zip(value._fields, value._values):
+ j_row.setField(field_name, to_java_data_structure(value))
+ else:
+ j_row = jvm.org.apache.flink.types.Row.withPositions(
+ value.get_row_kind().to_j_row_kind(), len(value)
+ )
+ for idx, value in enumerate(value._values):
+ j_row.setField(idx, to_java_data_structure(value))
+ return j_row
+ else:
+ raise TypeError('unsupported value type {}'.format(str(type(value))))
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
index 1bc1317b66e..2cde298ce55 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -38,6 +39,9 @@ import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
import org.apache.flink.api.common.typeutils.base.InstantSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.ShortSerializer;
@@ -69,10 +73,10 @@ import org.apache.flink.table.runtime.typeutils.serializers.python.DateSerialize
import org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer;
import org.apache.flink.table.runtime.typeutils.serializers.python.TimeSerializer;
import org.apache.flink.table.runtime.typeutils.serializers.python.TimestampSerializer;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
-import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
@@ -183,6 +187,10 @@ public class PythonTypeUtils {
return buildSqlTimeTypeProto((SqlTimeTypeInfo<?>) typeInformation);
}
+ if (typeInformation instanceof LocalTimeTypeInfo) {
+ return buildLocalTimeTypeProto((LocalTimeTypeInfo<?>) typeInformation);
+ }
+
if (typeInformation instanceof RowTypeInfo) {
return buildRowTypeProto((RowTypeInfo) typeInformation, userClassLoader);
}
@@ -218,12 +226,8 @@ public class PythonTypeUtils {
}
if (typeInformation instanceof InternalTypeInfo) {
- LogicalTypeRoot logicalTypeRoot =
- ((InternalTypeInfo<?>) typeInformation).toLogicalType().getTypeRoot();
- if (logicalTypeRoot.equals(LogicalTypeRoot.ROW)) {
- return buildRowDataTypeProto(
- (InternalTypeInfo<?>) typeInformation, userClassLoader);
- }
+ return buildInternalTypeProto(
+ (InternalTypeInfo<?>) typeInformation, userClassLoader);
}
if (typeInformation
@@ -270,6 +274,12 @@ public class PythonTypeUtils {
return FlinkFnApi.TypeInfo.newBuilder().setTypeName(typeName).build();
}
+ private static FlinkFnApi.TypeInfo buildLocalTimeTypeProto(
+ LocalTimeTypeInfo<?> localTimeTypeInfo) {
+ FlinkFnApi.TypeInfo.TypeName typeName = getTypeName(localTimeTypeInfo);
+ return FlinkFnApi.TypeInfo.newBuilder().setTypeName(typeName).build();
+ }
+
private static FlinkFnApi.TypeInfo buildPrimitiveArrayTypeProto(
PrimitiveArrayTypeInfo<?> primitiveArrayTypeInfo, ClassLoader userClassLoader) {
FlinkFnApi.TypeInfo elementFieldType =
@@ -336,34 +346,6 @@ public class PythonTypeUtils {
.build();
}
- private static FlinkFnApi.TypeInfo buildRowDataTypeProto(
- InternalTypeInfo<?> internalTypeInfo, ClassLoader userClassLoader) {
- RowType rowType = internalTypeInfo.toRowType();
- FlinkFnApi.TypeInfo.RowTypeInfo.Builder rowTypeInfoBuilder =
- FlinkFnApi.TypeInfo.RowTypeInfo.newBuilder();
-
- int arity = rowType.getFieldCount();
- for (int index = 0; index < arity; index++) {
- rowTypeInfoBuilder.addFields(
- FlinkFnApi.TypeInfo.RowTypeInfo.Field.newBuilder()
- .setFieldName(rowType.getFieldNames().get(index))
- .setFieldType(
- toTypeInfoProto(
- ExternalTypeInfo.of(
- TypeConversions.fromLogicalToDataType(
- rowType.getFields()
- .get(index)
- .getType())),
- userClassLoader))
- .build());
- }
-
- return FlinkFnApi.TypeInfo.newBuilder()
- .setTypeName(FlinkFnApi.TypeInfo.TypeName.ROW)
- .setRowTypeInfo(rowTypeInfoBuilder.build())
- .build();
- }
-
private static FlinkFnApi.TypeInfo buildTupleTypeProto(
TupleTypeInfo<?> tupleTypeInfo, ClassLoader userClassLoader) {
FlinkFnApi.TypeInfo.TupleTypeInfo.Builder tupleTypeInfoBuilder =
@@ -422,6 +404,138 @@ public class PythonTypeUtils {
.build();
}
+ private static FlinkFnApi.TypeInfo buildTableBinaryTypeProto() {
+ return FlinkFnApi.TypeInfo.newBuilder()
+ .setTypeName(FlinkFnApi.TypeInfo.TypeName.PRIMITIVE_ARRAY)
+ .setCollectionElementType(
+ FlinkFnApi.TypeInfo.newBuilder()
+ .setTypeName(FlinkFnApi.TypeInfo.TypeName.BYTE)
+ .build())
+ .build();
+ }
+
+ private static FlinkFnApi.TypeInfo buildTableArrayTypeProto(
+ InternalTypeInfo<?> internalTypeInfo, ClassLoader userClassLoader) {
+ ArrayType arrayType = (ArrayType) internalTypeInfo.toLogicalType();
+ return FlinkFnApi.TypeInfo.newBuilder()
+ .setTypeName(FlinkFnApi.TypeInfo.TypeName.LIST)
+ .setCollectionElementType(
+ toTypeInfoProto(
+ InternalTypeInfo.of(arrayType.getElementType()),
+ userClassLoader))
+ .build();
+ }
+
+ private static FlinkFnApi.TypeInfo buildTableMapTypeProto(
+ InternalTypeInfo<?> internalTypeInfo, ClassLoader userClassLoader) {
+ MapType mapType = (MapType) internalTypeInfo.toLogicalType();
+ FlinkFnApi.TypeInfo.MapTypeInfo.Builder mapTypeInfoBuilder =
+ FlinkFnApi.TypeInfo.MapTypeInfo.newBuilder();
+ mapTypeInfoBuilder
+ .setKeyType(
+ toTypeInfoProto(
+ InternalTypeInfo.of(mapType.getKeyType()), userClassLoader))
+ .setValueType(
+ toTypeInfoProto(
+ InternalTypeInfo.of(mapType.getValueType()), userClassLoader));
+ return FlinkFnApi.TypeInfo.newBuilder()
+ .setTypeName(FlinkFnApi.TypeInfo.TypeName.MAP)
+ .setMapTypeInfo(mapTypeInfoBuilder.build())
+ .build();
+ }
+
+ private static FlinkFnApi.TypeInfo buildTableRowTypeProto(
+ InternalTypeInfo<?> internalTypeInfo, ClassLoader userClassLoader) {
+ RowType rowType = internalTypeInfo.toRowType();
+ FlinkFnApi.TypeInfo.RowTypeInfo.Builder rowTypeInfoBuilder =
+ FlinkFnApi.TypeInfo.RowTypeInfo.newBuilder();
+
+ int arity = rowType.getFieldCount();
+ for (int index = 0; index < arity; index++) {
+ rowTypeInfoBuilder.addFields(
+ FlinkFnApi.TypeInfo.RowTypeInfo.Field.newBuilder()
+ .setFieldName(rowType.getFieldNames().get(index))
+ .setFieldType(
+ toTypeInfoProto(
+ InternalTypeInfo.of(
+ rowType.getFields().get(index).getType()),
+ userClassLoader))
+ .build());
+ }
+
+ return FlinkFnApi.TypeInfo.newBuilder()
+ .setTypeName(FlinkFnApi.TypeInfo.TypeName.ROW)
+ .setRowTypeInfo(rowTypeInfoBuilder.build())
+ .build();
+ }
+
+ private static FlinkFnApi.TypeInfo buildInternalTypeProto(
+ InternalTypeInfo<?> internalTypeInfo, ClassLoader userClassLoader)
+ throws UnsupportedOperationException {
+ FlinkFnApi.TypeInfo.TypeName typeName;
+ switch (internalTypeInfo.toLogicalType().getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ typeName = FlinkFnApi.TypeInfo.TypeName.STRING;
+ break;
+ case BOOLEAN:
+ typeName = FlinkFnApi.TypeInfo.TypeName.BOOLEAN;
+ break;
+ case BINARY:
+ case VARBINARY:
+ return buildTableBinaryTypeProto();
+ case DECIMAL:
+ typeName = FlinkFnApi.TypeInfo.TypeName.BIG_DEC;
+ break;
+ case TINYINT:
+ typeName = FlinkFnApi.TypeInfo.TypeName.BYTE;
+ break;
+ case SMALLINT:
+ typeName = FlinkFnApi.TypeInfo.TypeName.SHORT;
+ break;
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ typeName = FlinkFnApi.TypeInfo.TypeName.INT;
+ break;
+ case BIGINT:
+ typeName = FlinkFnApi.TypeInfo.TypeName.BIG_INT;
+ break;
+ case INTERVAL_DAY_TIME:
+ typeName = FlinkFnApi.TypeInfo.TypeName.LONG;
+ break;
+ case FLOAT:
+ typeName = FlinkFnApi.TypeInfo.TypeName.FLOAT;
+ break;
+ case DOUBLE:
+ typeName = FlinkFnApi.TypeInfo.TypeName.DOUBLE;
+ break;
+ case DATE:
+ typeName = FlinkFnApi.TypeInfo.TypeName.SQL_DATE;
+ break;
+ case TIME_WITHOUT_TIME_ZONE:
+ typeName = FlinkFnApi.TypeInfo.TypeName.SQL_TIME;
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ typeName = FlinkFnApi.TypeInfo.TypeName.SQL_TIMESTAMP;
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ typeName = FlinkFnApi.TypeInfo.TypeName.LOCAL_ZONED_TIMESTAMP;
+ break;
+ case ARRAY:
+ return buildTableArrayTypeProto(internalTypeInfo, userClassLoader);
+ case MAP:
+ return buildTableMapTypeProto(internalTypeInfo, userClassLoader);
+ case ROW:
+ return buildTableRowTypeProto(internalTypeInfo, userClassLoader);
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "InternalTypeInfo %s is still not supported in PyFlink",
+ internalTypeInfo));
+ }
+ return FlinkFnApi.TypeInfo.newBuilder().setTypeName(typeName).build();
+ }
+
private static FlinkFnApi.TypeInfo.TypeName getTypeName(TypeInformation<?> typeInfo) {
if (typeInfo.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
return FlinkFnApi.TypeInfo.TypeName.BOOLEAN;
@@ -515,6 +629,18 @@ public class PythonTypeUtils {
return FlinkFnApi.TypeInfo.TypeName.SQL_TIMESTAMP;
}
+ if (typeInfo.equals(LocalTimeTypeInfo.LOCAL_DATE)) {
+ return FlinkFnApi.TypeInfo.TypeName.LOCAL_DATE;
+ }
+
+ if (typeInfo.equals(LocalTimeTypeInfo.LOCAL_TIME)) {
+ return FlinkFnApi.TypeInfo.TypeName.LOCAL_TIME;
+ }
+
+ if (typeInfo.equals(LocalTimeTypeInfo.LOCAL_DATE_TIME)) {
+ return FlinkFnApi.TypeInfo.TypeName.LOCAL_DATETIME;
+ }
+
throw new UnsupportedOperationException(
String.format(
"Type %s is still not supported in PyFlink.", typeInfo.toString()));
@@ -582,6 +708,14 @@ public class PythonTypeUtils {
SqlTimeTypeInfo.TIME.getTypeClass(), TimeSerializer.INSTANCE);
typeInfoToSerializerMap.put(
SqlTimeTypeInfo.TIMESTAMP.getTypeClass(), new TimestampSerializer(3));
+
+ typeInfoToSerializerMap.put(
+ LocalTimeTypeInfo.LOCAL_DATE.getTypeClass(), LocalDateSerializer.INSTANCE);
+ typeInfoToSerializerMap.put(
+ LocalTimeTypeInfo.LOCAL_TIME.getTypeClass(), LocalTimeSerializer.INSTANCE);
+ typeInfoToSerializerMap.put(
+ LocalTimeTypeInfo.LOCAL_DATE_TIME.getTypeClass(),
+ LocalDateTimeSerializer.INSTANCE);
}
@SuppressWarnings("unchecked")
@@ -670,16 +804,9 @@ public class PythonTypeUtils {
}
if (typeInformation instanceof InternalTypeInfo) {
- LogicalTypeRoot logicalTypeRoot =
- ((InternalTypeInfo<?>) typeInformation)
- .getDataType()
- .getLogicalType()
- .getTypeRoot();
- if (logicalTypeRoot.equals(LogicalTypeRoot.ROW)) {
- return org.apache.flink.table.runtime.typeutils.PythonTypeUtils
- .toInternalSerializer(
- ((InternalTypeInfo<?>) typeInformation).toRowType());
- }
+ InternalTypeInfo<?> internalTypeInfo = (InternalTypeInfo<?>) typeInformation;
+ return org.apache.flink.table.runtime.typeutils.PythonTypeUtils
+ .toInternalSerializer(internalTypeInfo.toLogicalType());
}
if (typeInformation