You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈帅 <ca...@gmail.com> on 2020/11/25 04:03:18 UTC

flink sql时间戳字段类型转换问题

数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit
 中的kafka消息,里面user_behavior消息例如
{"user_id": "470572", "item_id":"3760258", "category_id": "1299190",
"behavior": "pv", "ts": "2017-11-26T01:00:01Z"}
可以看到ts值是  '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'format' = 'json',
    -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL'
    'scan.startup.mode' = 'earliest-offset'
);

程序运行会抛错
Caused by: java.time.format.DateTimeParseException: Text
'2017-11-26T01:00:00Z' could not be parsed at index 10

我查了一下flink json官方文档
https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard
目前只支持两种格式:SQL 和 ISO-8601
其中SQL支持的格式是 'yyyy-MM-dd HH:mm:ss',
而ISO-8601支持的格式是 'yyyy-MM-ddTHH:mm:ss.s{precision}'
确实不支持上面的 'yyyy-MM-ddTHH:mm:ssZ' (注意末尾的Z)

请问:
1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string,
pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string
里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z',
'yyyy-MM-dd'T'HH:mm:ss'Z'')?
3. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME
ZONE这两种类型在什么情况下会用到?有例子吗?

谢谢!

Re: flink sql时间戳字段类型转换问题

Posted by Jark Wu <im...@gmail.com>.
你可以用这篇文章中的 docker:
https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html
https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml

这个容器里面的 ts 数据格式是 SQL 格式的。

1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
TIMESTAMP WITH LOCAL TIME ZONE, 1.12 的 json formart 才支持。

2. 是的

3. Flink 目前还不支持 TIMESTAMP WITH TIME ZONE。
'yyyy-MM-dd HH:mm:ss' 这种,对应的是 TIMESTAMP,代表无时区 timestamp
long 值,或者  'yyyy-MM-dd HH:mm:ssZ' 这种是TIMESTAMP WITH LOCAL TIME ZONE
,代表session 时区的 timestamp

Best,
Jark



On Wed, 25 Nov 2020 at 12:03, 陈帅 <ca...@gmail.com> wrote:

> 数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit
>  中的kafka消息,里面user_behavior消息例如
> {"user_id": "470572", "item_id":"3760258", "category_id": "1299190",
> "behavior": "pv", "ts": "2017-11-26T01:00:01Z"}
> 可以看到ts值是  '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下
>
> CREATE TABLE user_log (
>     user_id VARCHAR,
>     item_id VARCHAR,
>     category_id VARCHAR,
>     behavior VARCHAR,
>     ts TIMESTAMP(3),
>     WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'user_behavior',
>     'properties.bootstrap.servers' = 'localhost:9092',
>     'properties.group.id' = 'testGroup',
>     'format' = 'json',
>     -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL'
>     'scan.startup.mode' = 'earliest-offset'
> );
>
> 程序运行会抛错
> Caused by: java.time.format.DateTimeParseException: Text
> '2017-11-26T01:00:00Z' could not be parsed at index 10
>
> 我查了一下flink json官方文档
>
> https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> 目前只支持两种格式:SQL 和 ISO-8601
> 其中SQL支持的格式是 'yyyy-MM-dd HH:mm:ss',
> 而ISO-8601支持的格式是 'yyyy-MM-ddTHH:mm:ss.s{precision}'
> 确实不支持上面的 'yyyy-MM-ddTHH:mm:ssZ' (注意末尾的Z)
>
> 请问:
> 1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
> 2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string,
> pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string
> 里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z',
> 'yyyy-MM-dd'T'HH:mm:ss'Z'')?
> 3. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME
> ZONE这两种类型在什么情况下会用到?有例子吗?
>
> 谢谢!
>