You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jesse Lord <jl...@vectra.ai> on 2020/07/15 15:41:25 UTC
Pyflink sink rowtime field
I am trying to sink the rowtime field in pyflink 1.10. I get the following error
For the source schema I use
.field("rowtime", DataTypes.TIMESTAMP(2))
.rowtime(
Rowtime()
.timestamps_from_field("timestamp")
.watermarks_periodic_ascending()
)
To create the rowtime field and have tried variations on
.field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
In the sink schema.
Trying all of the different types in DataTypes I get essentially the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o56.insertInto.
: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`output` do not match.
Query result schema: [rowtime: LocalDateTime]
TableSink schema: [rowtime: Timestamp]
I know that in Java there is org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python documentation lists Types.SQL_TIMESTAMP, but I cannot find the corresponding type in the python library. Can anyone help point me to the correct type for the schema?
Thanks,
Jesse
Re: Pyflink sink rowtime field
Posted by Xingbo Huang <hx...@gmail.com>.
Hi Jesse,
I think that the type of rowtime you declared on the source schema is
DataTypes.Timestamp(), you also use DataTypes.Timestamp() on the sink schema
Best,
Xingbo
Jesse Lord <jl...@vectra.ai> 于2020年7月15日周三 下午11:41写道:
> I am trying to sink the rowtime field in pyflink 1.10. I get the following
> error
>
>
>
> For the source schema I use
>
>
>
> .field("rowtime", DataTypes.TIMESTAMP(2))
>
> .rowtime(
>
> Rowtime()
>
> .timestamps_from_field("timestamp")
>
> .watermarks_periodic_ascending()
>
> )
>
>
>
> To create the rowtime field and have tried variations on
>
>
>
> .field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
>
>
>
> In the sink schema.
>
>
>
> Trying all of the different types in DataTypes I get essentially the
> following error:
>
>
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o56.insertInto.
>
> : org.apache.flink.table.api.ValidationException: Field types of query
> result and registered TableSink
> `default_catalog`.`default_database`.`output` do not match.
>
> Query result schema: [rowtime: LocalDateTime]
>
> TableSink schema: [rowtime: Timestamp]
>
>
>
>
>
> I know that in Java there is
> org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python
> documentation lists Types.SQL_TIMESTAMP, but I cannot find the
> corresponding type in the python library. Can anyone help point me to the
> correct type for the schema?
>
>
>
> Thanks,
>
> Jesse
>
>
>
>
>
>
>
>
>
>
>