You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/06/27 02:41:00 UTC
[jira] [Updated] (FLINK-28253) It reports "LocalDateTime is not supported in PyFlink currently" when converting between Table and DataStream
[ https://issues.apache.org/jira/browse/FLINK-28253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dian Fu updated FLINK-28253:
----------------------------
Summary: It reports "LocalDateTime is not supported in PyFlink currently" when converting between Table and DataStream (was: LocalDateTime is not supported in PyFlink)
> It reports "LocalDateTime is not supported in PyFlink currently" when converting between Table and DataStream
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-28253
> URL: https://issues.apache.org/jira/browse/FLINK-28253
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.14.0, 1.15.0
> Reporter: Dian Fu
> Priority: Major
>
> For the following job:
> {code}
> from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
> from pyflink.table import EnvironmentSettings
> from pyflink.table.table_environment import StreamTableEnvironment
> if __name__ == '__main__':
> env = StreamExecutionEnvironment.get_execution_environment()
> settings = EnvironmentSettings.new_instance() \
> .in_streaming_mode() \
> .build()
> t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
> t_env.execute_sql("""
> CREATE TABLE events (
> `id` VARCHAR,
> `source` VARCHAR,
> `resources` VARCHAR,
> `time` TIMESTAMP(3),
> WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
> PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'file:///path/to/input',
> 'format' = 'csv'
> )
> """)
> events_stream_table = t_env.from_path('events')
> events_stream = t_env.to_data_stream(events_stream_table)
> # Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.SQL_TIMESTAMP()])
> # now do some processing - let's filter by the type of event we get
> codebuild_stream = events_stream.filter(
> lambda event: event['source'] == 'aws.codebuild'
> )
> codebuild_stream.print()
> env.execute()
> {code}
> It will reports the following exception:
> {code}
> Traceback (most recent call last):
> File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", line 47, in <module>
> process()
> File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", line 39, in process
> lambda event: event['source'] == 'aws.codebuild'
> File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", line 432, in filter
> self._j_data_stream.getTransformation().getOutputType())
> File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1070, in _from_java_type
> TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType())))
> File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1042, in _from_java_type
> j_row_field_types]
> File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1041, in <listcomp>
> row_field_types = [_from_java_type(j_row_field_type) for j_row_field_type in
> File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1072, in _from_java_type
> raise TypeError("The java type info: %s is not supported in PyFlink currently." % j_type_info)
> TypeError: The java type info: LocalDateTime is not supported in PyFlink currently.
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)