You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by renzhaojin <re...@100.me> on 2022/05/13 13:52:51 UTC

回复:DataStream 转换 Table 后事件时间不对

hello,你加上设置时区试试呢
Configuration configuration = tableEnvironment.getConfig().getConfiguration();

configuration.setString("table.local-time-zone", "Asia/Shanghai");


在 2022年5月13日 21:40,赢峰<si...@163.com> 写道:


DataStream 转换 Table 后事件时间怎么不对?是时区的问题吗?怎么解决? ``` DataStream<UserBehavior> sourceStream = env.fromElements( new UserBehavior(1001L, 3827899L, 2920476L, "pv", 1511713473000L, "2017-11-27 00:24:33"), new UserBehavior(1001L, 3745169L, 2891509L, "pv", 1511725471000L, "2017-11-27 03:44:31"), new UserBehavior(1001L, 1531036L, 2920476L, "pv", 1511733732000L, "2017-11-27 06:02:12"), new UserBehavior(1001L, 2266567L, 4145813L, "pv", 1511741471000L, "2017-11-27 08:11:11"), new UserBehavior(1001L, 2951368L, 1080785L, "pv", 1511750828000L, "2017-11-27 10:47:08"), new UserBehavior(1002L, 5002615L, 2520377L, "pv", 1511752985000L, "2017-11-27 11:23:05") ); // 提取时间戳并分配 Watermark DataStream<UserBehavior> behaviorStream = sourceStream.assignTimestampsAndWatermarks( WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness(Duration.ofMinutes(1)) .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() { @Override public long extractTimestamp(UserBehavior behavior, long recordTimestamp) { return behavior.getTs(); } }) ); // 注册虚拟表 Table behaviorTable = tEnv.fromDataStream(behaviorStream, $("uid"), $("ts"), $("ts_row").rowtime()); tEnv.createTemporaryView("user_behavior", behaviorTable); Table resultTable = tEnv.sqlQuery("SELECT uid, ts, ts_row FROM user_behavior"); DataStream<Row> resultStream = tEnv.toAppendStream(resultTable, Row.class); resultStream.print(); ``` 输出结果: ``` 3> +I[1001, 1511725471000, 2017-11-26T19:44:31] 4> +I[1001, 1511733732000, 2017-11-26T22:02:12] 3> +I[1002, 1511752985000, 2017-11-27T03:23:05] 2> +I[1001, 1511713473000, 2017-11-26T16:24:33] 2> +I[1001, 1511750828000, 2017-11-27T02:47:08] 1> +I[1001, 1511741471000, 2017-11-27T00:11:11] ``` 1511725471000 -> 2017-11-27 03:44:31 目测是时区的问题

回复:DataStream 转换 Table 后事件时间不对

Posted by 我期待~ <86...@qq.com.INVALID>.
我把UserBehavior类中ts_row设置为LocalDateTime类型,流转表改成下面这样,yyyy-MM-dd HH:mm:ss时间可以按照原样输出


Table behaviorTable = tEnv.fromDataStream(behaviorStream, Schema.newBuilder()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .column("ts_row", "TIMESTAMP(3)")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .column("uid", "BIGINT")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .column("ts", "BIGINT")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .watermark("ts_row", "ts_row - INTERVAL '10' SECOND")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build());



官网对于流转表有个说明:在从 DataStream 转换到 Table 时,由于&nbsp;DataStream&nbsp;没有时区概念,因此 Flink 总是将&nbsp;rowtime&nbsp;属性解析成&nbsp;TIMESTAMP WITHOUT TIME ZONE&nbsp;类型,并且将所有事件时间的值都视为 UTC 时区的值。
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/time_attributes/




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <si_ji_feng@163.com&gt;;
发送时间:&nbsp;2022年5月13日(星期五) 晚上10:07
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re:回复:DataStream 转换 Table 后事件时间不对



没有任何变化
在 2022-05-13 21:52:51,"renzhaojin" <renzhaojin@100.me&gt; 写道:
&gt;hello,你加上设置时区试试呢
&gt;Configuration configuration = tableEnvironment.getConfig().getConfiguration();
&gt;
&gt;configuration.setString("table.local-time-zone", "Asia/Shanghai");
&gt;
&gt;
&gt;在 2022年5月13日 21:40,赢峰<si_ji_feng@163.com&gt; 写道:
&gt;
&gt;
&gt;DataStream 转换 Table 后事件时间怎么不对?是时区的问题吗?怎么解决? ``` DataStream<UserBehavior&gt; sourceStream = env.fromElements( new UserBehavior(1001L, 3827899L, 2920476L, "pv", 1511713473000L, "2017-11-27 00:24:33"), new UserBehavior(1001L, 3745169L, 2891509L, "pv", 1511725471000L, "2017-11-27 03:44:31"), new UserBehavior(1001L, 1531036L, 2920476L, "pv", 1511733732000L, "2017-11-27 06:02:12"), new UserBehavior(1001L, 2266567L, 4145813L, "pv", 1511741471000L, "2017-11-27 08:11:11"), new UserBehavior(1001L, 2951368L, 1080785L, "pv", 1511750828000L, "2017-11-27 10:47:08"), new UserBehavior(1002L, 5002615L, 2520377L, "pv", 1511752985000L, "2017-11-27 11:23:05") ); // 提取时间戳并分配 Watermark DataStream<UserBehavior&gt; behaviorStream = sourceStream.assignTimestampsAndWatermarks( WatermarkStrategy.<UserBehavior&gt;forBoundedOutOfOrderness(Duration.ofMinutes(1)) .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior&gt;() { @Override public long extractTimestamp(UserBehavior behavior, long recordTimestamp) { return behavior.getTs(); } }) ); // 注册虚拟表 Table behaviorTable = tEnv.fromDataStream(behaviorStream, $("uid"), $("ts"), $("ts_row").rowtime()); tEnv.createTemporaryView("user_behavior", behaviorTable); Table resultTable = tEnv.sqlQuery("SELECT uid, ts, ts_row FROM user_behavior"); DataStream<Row&gt; resultStream = tEnv.toAppendStream(resultTable, Row.class); resultStream.print(); ``` 输出结果: ``` 3&gt; +I[1001, 1511725471000, 2017-11-26T19:44:31] 4&gt; +I[1001, 1511733732000, 2017-11-26T22:02:12] 3&gt; +I[1002, 1511752985000, 2017-11-27T03:23:05] 2&gt; +I[1001, 1511713473000, 2017-11-26T16:24:33] 2&gt; +I[1001, 1511750828000, 2017-11-27T02:47:08] 1&gt; +I[1001, 1511741471000, 2017-11-27T00:11:11] ``` 1511725471000 -&gt; 2017-11-27 03:44:31 目测是时区的问题

Re:回复:DataStream 转换 Table 后事件时间不对

Posted by 赢峰 <si...@163.com>.
没有任何变化
在 2022-05-13 21:52:51,"renzhaojin" <re...@100.me> 写道:
>hello,你加上设置时区试试呢
>Configuration configuration = tableEnvironment.getConfig().getConfiguration();
>
>configuration.setString("table.local-time-zone", "Asia/Shanghai");
>
>
>在 2022年5月13日 21:40,赢峰<si...@163.com> 写道:
>
>
>DataStream 转换 Table 后事件时间怎么不对?是时区的问题吗?怎么解决? ``` DataStream<UserBehavior> sourceStream = env.fromElements( new UserBehavior(1001L, 3827899L, 2920476L, "pv", 1511713473000L, "2017-11-27 00:24:33"), new UserBehavior(1001L, 3745169L, 2891509L, "pv", 1511725471000L, "2017-11-27 03:44:31"), new UserBehavior(1001L, 1531036L, 2920476L, "pv", 1511733732000L, "2017-11-27 06:02:12"), new UserBehavior(1001L, 2266567L, 4145813L, "pv", 1511741471000L, "2017-11-27 08:11:11"), new UserBehavior(1001L, 2951368L, 1080785L, "pv", 1511750828000L, "2017-11-27 10:47:08"), new UserBehavior(1002L, 5002615L, 2520377L, "pv", 1511752985000L, "2017-11-27 11:23:05") ); // 提取时间戳并分配 Watermark DataStream<UserBehavior> behaviorStream = sourceStream.assignTimestampsAndWatermarks( WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness(Duration.ofMinutes(1)) .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() { @Override public long extractTimestamp(UserBehavior behavior, long recordTimestamp) { return behavior.getTs(); } }) ); // 注册虚拟表 Table behaviorTable = tEnv.fromDataStream(behaviorStream, $("uid"), $("ts"), $("ts_row").rowtime()); tEnv.createTemporaryView("user_behavior", behaviorTable); Table resultTable = tEnv.sqlQuery("SELECT uid, ts, ts_row FROM user_behavior"); DataStream<Row> resultStream = tEnv.toAppendStream(resultTable, Row.class); resultStream.print(); ``` 输出结果: ``` 3> +I[1001, 1511725471000, 2017-11-26T19:44:31] 4> +I[1001, 1511733732000, 2017-11-26T22:02:12] 3> +I[1002, 1511752985000, 2017-11-27T03:23:05] 2> +I[1001, 1511713473000, 2017-11-26T16:24:33] 2> +I[1001, 1511750828000, 2017-11-27T02:47:08] 1> +I[1001, 1511741471000, 2017-11-27T00:11:11] ``` 1511725471000 -> 2017-11-27 03:44:31 目测是时区的问题