You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by hb <34...@163.com> on 2019/09/03 02:44:07 UTC

Flink SQL 时间问题

使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table


```
  ...


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()


  schema
    .field("_rowtime", Types.SQL_TIMESTAMP())
    .rowtime(
      new Rowtime()
        .timestampsFromField("eventTime")
        .watermarksPeriodicBounded(1000)
    )
```


问题1.  生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区.
问题2.  eventTime 事件时间字段怎么支持Long类型.


我输入到kafka记录为 {"eventTime": 100000, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题

Re:回复: Flink SQL 时间问题

Posted by hb <34...@163.com>.
kafka输入:  {"eventTime": 100000, "id":1,"name":"hb"}
  
错误提示:
```
Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON object.
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize JSON object.
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.time.format.DateTimeParseException: Text '100000' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more
```


kafka输入: {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"}
结果显示正确:    4> (true,1,hb,2019-09-04T06:45:46.846,2019-09-02T09:56:16.484)
感到奇怪.


```
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val conf = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv = StreamTableEnvironment.create(env, conf)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  val kafkaIn = new Kafka()
    .version("0.11")
    .topic("hbtest111")
    .property("bootstrap.servers", "192.168.1.160:19092")
    .property("group.id", "test2")


  val json = new Json().deriveSchema()


  val schema = new Schema()
    .field("id", Types.INT())
    .field("name", Types.STRING())


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
  schema
    .field("_rowtime", Types.SQL_TIMESTAMP())
    .rowtime(
      new Rowtime()
        .timestampsFromField("eventTime")
        .watermarksPeriodicBounded(1000)
    )


  tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka")
  val t = tEnv.sqlQuery("select * from table_from_kafka")
  t.printSchema()


  t.toRetractStream[Row].print()
  tEnv.execute("")
```










在 2019-09-03 21:52:54,"Jimmy Wong" <wa...@163.com> 写道:
>Hi:
>时间你可以转成Long,关于UTC,你说要生成Table,这样的话如果用的是SQL,可以采用UDF进行转换
>
>
>| |
>Jimmy
>|
>|
>wangzmking@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2019年09月3日 21:25,JingsongLee<lz...@aliyun.com.INVALID> 写道:
>Hi:
>1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。
>2.支持long的,你输入是不是int才会报错的,具体报错的信息?
>
>Best,
>Jingsong Lee
>
>
>------------------------------------------------------------------
>From:hb <34...@163.com>
>Send Time:2019年9月3日(星期二) 10:44
>To:user-zh <us...@flink.apache.org>
>Subject:Flink SQL 时间问题
>
>使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
>
>
>```
>...
>
>
>schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
>
>
>schema
>.field("_rowtime", Types.SQL_TIMESTAMP())
>.rowtime(
>new Rowtime()
>.timestampsFromField("eventTime")
>.watermarksPeriodicBounded(1000)
>)
>```
>
>
>问题1.  生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区.
>问题2.  eventTime 事件时间字段怎么支持Long类型.
>
>
>我输入到kafka记录为 {"eventTime": 100000, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题

回复: Flink SQL 时间问题

Posted by Jimmy Wong <wa...@163.com>.
Hi:
时间你可以转成Long,关于UTC,你说要生成Table,这样的话如果用的是SQL,可以采用UDF进行转换


| |
Jimmy
|
|
wangzmking@163.com
|
签名由网易邮箱大师定制


在2019年09月3日 21:25,JingsongLee<lz...@aliyun.com.INVALID> 写道:
Hi:
1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。
2.支持long的,你输入是不是int才会报错的,具体报错的信息?

Best,
Jingsong Lee


------------------------------------------------------------------
From:hb <34...@163.com>
Send Time:2019年9月3日(星期二) 10:44
To:user-zh <us...@flink.apache.org>
Subject:Flink SQL 时间问题

使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table


```
...


schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()


schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)
```


问题1.  生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区.
问题2.  eventTime 事件时间字段怎么支持Long类型.


我输入到kafka记录为 {"eventTime": 100000, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题

Re: Flink SQL 时间问题

Posted by JingsongLee <lz...@aliyun.com.INVALID>.
Hi:
1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。
2.支持long的,你输入是不是int才会报错的,具体报错的信息?

Best,
Jingsong Lee


------------------------------------------------------------------
From:hb <34...@163.com>
Send Time:2019年9月3日(星期二) 10:44
To:user-zh <us...@flink.apache.org>
Subject:Flink SQL 时间问题

使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table


```
  ...


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()


  schema
    .field("_rowtime", Types.SQL_TIMESTAMP())
    .rowtime(
      new Rowtime()
        .timestampsFromField("eventTime")
        .watermarksPeriodicBounded(1000)
    )
```


问题1.  生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区.
问题2.  eventTime 事件时间字段怎么支持Long类型.


我输入到kafka记录为 {"eventTime": 100000, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题