You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kylin <ky...@163.com> on 2020/09/17 02:00:56 UTC

flink table api中无法设置子json中的列为rowtime

flink版本1.7.2

flink table api从kafka读取json数据,JsonSchema如下图所示
发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定?
tableEnv.connect(
  new Kafka()
    .version("0.10")
    .topic(topic_in)
    .property("bootstrap.servers", brokers)
    .property("group.id", "TableApiT2")
    .startFromLatest()
).withFormat(
  new Json()
    .jsonSchema(
      """
        |{
        |  "type": 'object',
        |  "properties": {
        |   "metric": {
        |    "type": 'object',
        |         "properties": {
        |           "time_stamp": {
        |       "type": 'string',
        |       format: 'date-time'
        |      },
        |      "event_time": {
        |       "type": 'string'
        |      },
        |      "cluster": {
        |       "type": 'string'
        |      },
        |      "host": {
        |       "type": 'string'
        |      },
        |      "instance": {
        |       "type": 'string'
        |      },
        |      "index_name": {
        |       "type": 'string'
        |      },
        |      "index_num": {
        |       "type": 'string'
        |      },
        |      "value": {
        |       "type": 'number'
        |      }
        |    }
        |   },
        |   "source": {
        |    "type": 'string'
        |   }
        |  }
        |}
        |""".stripMargin
    )
).withSchema(
  new Schema()
    .field("metric",
      Types.ROW_NAMED(
        Array("time_stamp", "event_time", "cluster", "host", "instance", "index_name", "index_num", "value"),
        Types.SQL_TIMESTAMP,
        Types.STRING,
        Types.STRING,
        Types.STRING,
        Types.STRING,
        Types.STRING,
        Types.STRING,
        Types.BIG_DEC
      )
    ) //***如何指定上面row类型中time_stamp为rowtime
).inAppendMode()
  .registerTableSource("metricTable")



Re: flink table api中无法设置子json中的列为rowtime

Posted by Jark Wu <im...@gmail.com>.
1. 只支持指定顶层字段作为 rowtime,如果要使用 nested field 作为 rowtime,可以先使用计算列(仅在 DDL
上支持)生成顶层列。
2. Descriptor API 有很多问题,且缺失很多功能,不建议使用,建议使用 DDL。 Descriptor API 将在1.12 版本中重构。


Best,
Jark


On Thu, 17 Sep 2020 at 10:41, kylin <ky...@163.com> wrote:

> flink版本1.7.2
>
> flink table api从kafka读取json数据,JsonSchema如下图所示
> 发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定?
> tableEnv.connect(
>   new Kafka()
>     .version("0.10")
>     .topic(topic_in)
>     .property("bootstrap.servers", brokers)
>     .property("group.id", "TableApiT2")
>     .startFromLatest()
> ).withFormat(
>   new Json()
>     .jsonSchema(
>       """
>         |{
>         |  "type": 'object',
>         |  "properties": {
>         |   "metric": {
>         |    "type": 'object',
>         |         "properties": {
>         |           "time_stamp": {
>         |       "type": 'string',
>         |       format: 'date-time'
>         |      },
>         |      "event_time": {
>         |       "type": 'string'
>         |      },
>         |      "cluster": {
>         |       "type": 'string'
>         |      },
>         |      "host": {
>         |       "type": 'string'
>         |      },
>         |      "instance": {
>         |       "type": 'string'
>         |      },
>         |      "index_name": {
>         |       "type": 'string'
>         |      },
>         |      "index_num": {
>         |       "type": 'string'
>         |      },
>         |      "value": {
>         |       "type": 'number'
>         |      }
>         |    }
>         |   },
>         |   "source": {
>         |    "type": 'string'
>         |   }
>         |  }
>         |}
>         |""".stripMargin
>     )
> ).withSchema(
>   new Schema()
>     .field("metric",
>       Types.ROW_NAMED(
>         Array("time_stamp", "event_time", "cluster", "host", "instance",
> "index_name", "index_num", "value"),
>         Types.SQL_TIMESTAMP,
>         Types.STRING,
>         Types.STRING,
>         Types.STRING,
>         Types.STRING,
>         Types.STRING,
>         Types.STRING,
>         Types.BIG_DEC
>       )
>     ) //***如何指定上面row类型中time_stamp为rowtime
> ).inAppendMode()
>   .registerTableSource("metricTable")
>
>
>