You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jesse Lord <jl...@vectra.ai> on 2020/07/15 15:41:25 UTC

Pyflink sink rowtime field

I am trying to sink the rowtime field in pyflink 1.10. I get the following error

For the source schema I use

    .field("rowtime", DataTypes.TIMESTAMP(2))
        .rowtime(
            Rowtime()
            .timestamps_from_field("timestamp")
            .watermarks_periodic_ascending()
        )

To create the rowtime field and have tried variations on

    .field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())

In the sink schema.

Trying all of the different types in DataTypes I get essentially the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o56.insertInto.
: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`output` do not match.
Query result schema: [rowtime: LocalDateTime]
TableSink schema:    [rowtime: Timestamp]


I know that in Java there is org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python documentation lists Types.SQL_TIMESTAMP, but I cannot find the corresponding type in the python library. Can anyone help point me to the correct type for the schema?

Thanks,
Jesse






Re: Pyflink sink rowtime field

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Jesse,
I think that the type of rowtime you declared on the source schema is
DataTypes.Timestamp(), you also use DataTypes.Timestamp() on the sink schema

Best,
Xingbo

Jesse Lord <jl...@vectra.ai> 于2020年7月15日周三 下午11:41写道:

> I am trying to sink the rowtime field in pyflink 1.10. I get the following
> error
>
>
>
> For the source schema I use
>
>
>
>     .field("rowtime", DataTypes.TIMESTAMP(2))
>
>         .rowtime(
>
>             Rowtime()
>
>             .timestamps_from_field("timestamp")
>
>             .watermarks_periodic_ascending()
>
>         )
>
>
>
> To create the rowtime field and have tried variations on
>
>
>
>     .field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
>
>
>
> In the sink schema.
>
>
>
> Trying all of the different types in DataTypes I get essentially the
> following error:
>
>
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o56.insertInto.
>
> : org.apache.flink.table.api.ValidationException: Field types of query
> result and registered TableSink
> `default_catalog`.`default_database`.`output` do not match.
>
> Query result schema: [rowtime: LocalDateTime]
>
> TableSink schema:    [rowtime: Timestamp]
>
>
>
>
>
> I know that in Java there is
> org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python
> documentation lists Types.SQL_TIMESTAMP, but I cannot find the
> corresponding type in the python library. Can anyone help point me to the
> correct type for the schema?
>
>
>
> Thanks,
>
> Jesse
>
>
>
>
>
>
>
>
>
>
>