You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 往事Ψ随风 <13...@qq.com> on 2020/01/16 09:55:49 UTC

关于1.9版本的flinksql中的csv format的问题

大佬们好,
有问题想请教:
1、使用1.9版本的flinksql时,如果以kafka作为数据源,那么当中的Csv format部分(标青色的部分)该如何写才能正确匹配上Schema中的字段?【因为大部分资料中都是以json format比较多,所以没法参考】

tableEnv.connect(


new&nbsp;Kafka()


.version("0.10")


.topic("topic1")


.property("bootstrap.servers",&nbsp;"node1:9092")


.property("group.id","test")


.startFromLatest()


)

.withFormat(


new&nbsp;Csv()


.schema(Types.ROW(。。。。。))&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;

#我填“.schema(Types.ROW(Types.SQL_TIMESTAMP, Types.STRING, Types.INT)) ”会报错“Caused&nbsp;by:&nbsp;org.apache.flink.table.api.ValidationException:&nbsp;Could&nbsp;not&nbsp;map&nbsp;the&nbsp;schema&nbsp;field&nbsp;'fruit'&nbsp;to&nbsp;a&nbsp;field&nbsp;from&nbsp;source.&nbsp;Please&nbsp;specify&nbsp;the&nbsp;source&nbsp;field&nbsp;from&nbsp;which&nbsp;it&nbsp;can&nbsp;be&nbsp;derived.”





.fieldDelimiter(',')


.lineDelimiter("\r\n")


)

.withSchema(


new&nbsp;Schema()


.field("rowtime",Types.SQL_TIMESTAMP)


.rowtime(new&nbsp;Rowtime()


.timestampsFromField("eventtime")


.watermarksPeriodicBounded(0)


)


.field("fruit",&nbsp;Types.STRING)


.field("number",&nbsp;Types.INT)


)

.inAppendMode()

.registerTableSource("sourceTable")




期待大佬的指导!谢谢!