You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "lucas.wu" <lu...@xiaoying.com> on 2020/03/20 03:39:43 UTC

rowtime 的类型序列化问题

Hi all:
建表语句
create table `source_table`(
`SeqNo` varchar,
`Type` varchar,
`Table` varchar,
`ServerId` varchar,
`Database` varchar,
`OldData` varchar,
`GTID` varchar,
`Offset` varchar,
`event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'yyyy-MM-ddHH:mm:ss'),
WATERMARK FOR event_ts AS event_ts - interval '60' second
) with(…)


查询语句
insert into sinkTable from Select * from source_table;



报错信息:
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SinkConversion$51.processElement(Unknown Source)
……


最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。
请问这个问题可以避免吗?