You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Tipper <jo...@hotmail.com> on 2022/06/24 20:07:10 UTC

How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?

Hi,

I have a source table using a Kinesis connector reading events from AWS EventBridge using PyFlink 1.15.0. An example of the sorts of data that are in this stream is here: https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref. Note that the stream of data contains many different types of events, where the 'detail' field is completely different between different event types. There is no support for this connector using PyFlink DataStream API, so I use the Table API to construct the source table.  The table looks like this:


CREATE TABLE events (
     `id` VARCHAR,
     `source` VARCHAR,
     `account` VARCHAR,
     `region` VARCHAR,
     `detail-type` VARCHAR,
     `detail` VARCHAR,
     `source` VARCHAR,
     `resources` VARCHAR,
     `time` TIMESTAMP(0) WITH LOCAL TIME ZONE,
     WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
     PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
...
)


The table was created using:

 table_env.execute_sql(CREATE_STRING_ABOVE)

I'd like to turn this table into a data stream so I can perform some processing that is easier to do in the DataStream API:


events_stream_table = table_env.from_path('events')

events_stream = table_env.to_data_stream(events_stream_table)

# now do some processing - let's filter by the type of event we get

codebuild_stream = events_stream.filter(
    lambda event: event['source'] == 'aws.codebuild'
)

# now do other stuff on a stream containing only events that are identical in shape
...
# maybe convert back into a Table and perform SQL on the data


When I run this, I get an exception:



org.apache.flink.table.api.TableException: Unsupported conversion from data type

 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to

type information. Only data types that originated from type information fully

support a reverse conversion.

Somebody reported a similar error here (https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception) When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL TIME ZONE" with a "TIMESTAMP(3)" I get a different exception:

TypeError: The java type info: LocalDateTime is not supported in PyFlink currently.

Is there a way of converting this Table into a DataStream (and then back again)? I need to use the data in the "time"​ field as the source of watermarks for my events.

Many thanks,

John

Re: How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?

Posted by John Tipper <jo...@hotmail.com>.
Hi Dian,

Thanks, much appreciated.

Kind regards,

John

Sent from my iPhone

On 27 Jun 2022, at 03:43, Dian Fu <di...@gmail.com> wrote:


Hi John,

This seems like a bug and I have created a ticket https://issues.apache.org/jira/browse/FLINK-28253 to track it.

For now, you could try replacing to_data_stream with to_append_stream` to see if it works.

Regards,
Dian

On Sat, Jun 25, 2022 at 4:07 AM John Tipper <jo...@hotmail.com>> wrote:
Hi,

I have a source table using a Kinesis connector reading events from AWS EventBridge using PyFlink 1.15.0. An example of the sorts of data that are in this stream is here: https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref. Note that the stream of data contains many different types of events, where the 'detail' field is completely different between different event types. There is no support for this connector using PyFlink DataStream API, so I use the Table API to construct the source table.  The table looks like this:


CREATE TABLE events (
     `id` VARCHAR,
     `source` VARCHAR,
     `account` VARCHAR,
     `region` VARCHAR,
     `detail-type` VARCHAR,
     `detail` VARCHAR,
     `source` VARCHAR,
     `resources` VARCHAR,
     `time` TIMESTAMP(0) WITH LOCAL TIME ZONE,
     WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
     PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
...
)


The table was created using:

 table_env.execute_sql(CREATE_STRING_ABOVE)

I'd like to turn this table into a data stream so I can perform some processing that is easier to do in the DataStream API:


events_stream_table = table_env.from_path('events')

events_stream = table_env.to_data_stream(events_stream_table)

# now do some processing - let's filter by the type of event we get

codebuild_stream = events_stream.filter(
    lambda event: event['source'] == 'aws.codebuild'
)

# now do other stuff on a stream containing only events that are identical in shape
...
# maybe convert back into a Table and perform SQL on the data


When I run this, I get an exception:



org.apache.flink.table.api.TableException: Unsupported conversion from data type

 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to

type information. Only data types that originated from type information fully

support a reverse conversion.

Somebody reported a similar error here (https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception) When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL TIME ZONE" with a "TIMESTAMP(3)" I get a different exception:

TypeError: The java type info: LocalDateTime is not supported in PyFlink currently.

Is there a way of converting this Table into a DataStream (and then back again)? I need to use the data in the "time"​ field as the source of watermarks for my events.

Many thanks,

John

Re: How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?

Posted by Dian Fu <di...@gmail.com>.
Hi John,

This seems like a bug and I have created a ticket
https://issues.apache.org/jira/browse/FLINK-28253 to track it.

For now, you could try replacing to_data_stream with to_append_stream` to
see if it works.

Regards,
Dian

On Sat, Jun 25, 2022 at 4:07 AM John Tipper <jo...@hotmail.com> wrote:

> Hi,
>
> I have a source table using a Kinesis connector reading events from AWS
> EventBridge using PyFlink 1.15.0. An example of the sorts of data that are
> in this stream is here:
> https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref.
> Note that the stream of data contains many different types of events, where
> the 'detail' field is completely different between different event types.
> There is no support for this connector using PyFlink DataStream API, so I
> use the Table API to construct the source table.  The table looks like this:
>
>
> CREATE TABLE events (
>      `id` VARCHAR,
>      `source` VARCHAR,
>      `account` VARCHAR,
>      `region` VARCHAR,
>      `detail-type` VARCHAR,
>      `detail` VARCHAR,
>      `source` VARCHAR,
>      `resources` VARCHAR,
>      `time` TIMESTAMP(0) WITH LOCAL TIME ZONE,
>      WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
>      PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> ...
> )
>
>
>
> The table was created using:
>
>  table_env.execute_sql(CREATE_STRING_ABOVE)
>
> I'd like to turn this table into a data stream so I can perform some
> processing that is easier to do in the DataStream API:
>
>
> events_stream_table = table_env.from_path('events')
>
> events_stream = table_env.to_data_stream(events_stream_table)
>
> # now do some processing - let's filter by the type of event we get
>
> codebuild_stream = events_stream.filter(
>     lambda event: event['source'] == 'aws.codebuild'
> )
>
> # now do other stuff on a stream containing only events that are identical
> in shape
> ...
> # maybe convert back into a Table and perform SQL on the data
>
>
> When I run this, I get an exception:
>
>
> org.apache.flink.table.api.TableException: Unsupported conversion from data type
>
>  'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to
>
> type information. Only data types that originated from type information fully
>
> support a reverse conversion.
>
>
> Somebody reported a similar error here (
> https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception)
> When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL
> TIME ZONE" with a "TIMESTAMP(3)" I get a different exception:
>
> TypeError: The java type info: LocalDateTime is not supported in PyFlink
> currently.
>
>
> Is there a way of converting this Table into a DataStream (and then back
> again)? I need to use the data in the "time"​ field as the source of
> watermarks for my events.
>
> Many thanks,
>
> John
>