You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kylin <ky...@163.com> on 2020/09/17 02:00:56 UTC
flink table api中无法设置子json中的列为rowtime
flink版本1.7.2
flink table api从kafka读取json数据,JsonSchema如下图所示
发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定?
tableEnv.connect(
new Kafka()
.version("0.10")
.topic(topic_in)
.property("bootstrap.servers", brokers)
.property("group.id", "TableApiT2")
.startFromLatest()
).withFormat(
new Json()
.jsonSchema(
"""
|{
| "type": 'object',
| "properties": {
| "metric": {
| "type": 'object',
| "properties": {
| "time_stamp": {
| "type": 'string',
| format: 'date-time'
| },
| "event_time": {
| "type": 'string'
| },
| "cluster": {
| "type": 'string'
| },
| "host": {
| "type": 'string'
| },
| "instance": {
| "type": 'string'
| },
| "index_name": {
| "type": 'string'
| },
| "index_num": {
| "type": 'string'
| },
| "value": {
| "type": 'number'
| }
| }
| },
| "source": {
| "type": 'string'
| }
| }
|}
|""".stripMargin
)
).withSchema(
new Schema()
.field("metric",
Types.ROW_NAMED(
Array("time_stamp", "event_time", "cluster", "host", "instance", "index_name", "index_num", "value"),
Types.SQL_TIMESTAMP,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.BIG_DEC
)
) //***如何指定上面row类型中time_stamp为rowtime
).inAppendMode()
.registerTableSource("metricTable")
Re: flink table api中无法设置子json中的列为rowtime
Posted by Jark Wu <im...@gmail.com>.
1. 只支持指定顶层字段作为 rowtime,如果要使用 nested field 作为 rowtime,可以先使用计算列(仅在 DDL
上支持)生成顶层列。
2. Descriptor API 有很多问题,且缺失很多功能,不建议使用,建议使用 DDL。 Descriptor API 将在1.12 版本中重构。
Best,
Jark
On Thu, 17 Sep 2020 at 10:41, kylin <ky...@163.com> wrote:
> flink版本1.7.2
>
> flink table api从kafka读取json数据,JsonSchema如下图所示
> 发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定?
> tableEnv.connect(
> new Kafka()
> .version("0.10")
> .topic(topic_in)
> .property("bootstrap.servers", brokers)
> .property("group.id", "TableApiT2")
> .startFromLatest()
> ).withFormat(
> new Json()
> .jsonSchema(
> """
> |{
> | "type": 'object',
> | "properties": {
> | "metric": {
> | "type": 'object',
> | "properties": {
> | "time_stamp": {
> | "type": 'string',
> | format: 'date-time'
> | },
> | "event_time": {
> | "type": 'string'
> | },
> | "cluster": {
> | "type": 'string'
> | },
> | "host": {
> | "type": 'string'
> | },
> | "instance": {
> | "type": 'string'
> | },
> | "index_name": {
> | "type": 'string'
> | },
> | "index_num": {
> | "type": 'string'
> | },
> | "value": {
> | "type": 'number'
> | }
> | }
> | },
> | "source": {
> | "type": 'string'
> | }
> | }
> |}
> |""".stripMargin
> )
> ).withSchema(
> new Schema()
> .field("metric",
> Types.ROW_NAMED(
> Array("time_stamp", "event_time", "cluster", "host", "instance",
> "index_name", "index_num", "value"),
> Types.SQL_TIMESTAMP,
> Types.STRING,
> Types.STRING,
> Types.STRING,
> Types.STRING,
> Types.STRING,
> Types.STRING,
> Types.BIG_DEC
> )
> ) //***如何指定上面row类型中time_stamp为rowtime
> ).inAppendMode()
> .registerTableSource("metricTable")
>
>
>