You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/06/27 02:41:00 UTC

[jira] [Updated] (FLINK-28253) It reports "LocalDateTime is not supported in PyFlink currently" when converting between Table and DataStream

     [ https://issues.apache.org/jira/browse/FLINK-28253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dian Fu updated FLINK-28253:
----------------------------
    Summary: It reports "LocalDateTime is not supported in PyFlink currently" when converting between Table and DataStream  (was: LocalDateTime is not supported in PyFlink)

> It reports "LocalDateTime is not supported in PyFlink currently" when converting between Table and DataStream
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28253
>                 URL: https://issues.apache.org/jira/browse/FLINK-28253
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.14.0, 1.15.0
>            Reporter: Dian Fu
>            Priority: Major
>
> For the following job:
> {code}
> from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
> from pyflink.table import EnvironmentSettings
> from pyflink.table.table_environment import StreamTableEnvironment
> if __name__ == '__main__':
>     env = StreamExecutionEnvironment.get_execution_environment()
>     settings = EnvironmentSettings.new_instance() \
>         .in_streaming_mode() \
>         .build()
>     t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
>     t_env.execute_sql("""
>             CREATE TABLE events (
>                  `id` VARCHAR,
>                  `source` VARCHAR,
>                  `resources` VARCHAR,
>                  `time` TIMESTAMP(3),
>                  WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
>                  PRIMARY KEY (`id`) NOT ENFORCED
>             ) WITH (
>                 'connector' = 'filesystem',
>                  'path' = 'file:///path/to/input',
>                  'format' = 'csv'
>             )
>         """)
>     events_stream_table = t_env.from_path('events')
>     events_stream = t_env.to_data_stream(events_stream_table)
>     #  Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.SQL_TIMESTAMP()])
>     # now do some processing - let's filter by the type of event we get
>     codebuild_stream = events_stream.filter(
>         lambda event: event['source'] == 'aws.codebuild'
>     )
>     codebuild_stream.print()
>     env.execute()
> {code}
> It will reports the following exception:
> {code}
> Traceback (most recent call last):
>   File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", line 47, in <module>
>     process()
>   File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", line 39, in process
>     lambda event: event['source'] == 'aws.codebuild'
>   File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", line 432, in filter
>     self._j_data_stream.getTransformation().getOutputType())
>   File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1070, in _from_java_type
>     TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType())))
>   File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1042, in _from_java_type
>     j_row_field_types]
>   File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1041, in <listcomp>
>     row_field_types = [_from_java_type(j_row_field_type) for j_row_field_type in
>   File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1072, in _from_java_type
>     raise TypeError("The java type info: %s is not supported in PyFlink currently." % j_type_info)
> TypeError: The java type info: LocalDateTime is not supported in PyFlink currently.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)