You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 人生若只如初见 <ca...@qq.com> on 2020/04/18 10:38:35 UTC
问题请教-flinksql的kafkasource方面
大佬好:
请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create view as ...")却会报错。报错如下:
Exception in thread "main" org.apache.flink.table.api.TableException: Unsupported query: CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior,
CASE C.parent_category_id
WHEN 1 THEN '服饰鞋包'
WHEN 2 THEN '家装家饰'
WHEN 3 THEN '家电'
WHEN 4 THEN '美妆'
WHEN 5 THEN '母婴'
WHEN 6 THEN '3C数码'
WHEN 7 THEN '运动户外'
WHEN 8 THEN '食品'
ELSE '其他'
END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id
at org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
at org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown Source)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
望解答,十分感谢!
Re: 问题请教-flinksql的kafkasource方面
Posted by Jark Wu <im...@gmail.com>.
Hi,
关于第二个问题,Flink 1.10 只在 SQL CLI 支持了 CREATE VIEW 语法, TableEnvironment 上还未支持。
关于第一个问题,抱歉,没有太看懂。"每小时全网购买数/partition的的数量,数据不准确了" 是什么意思呢?要不举个例子看下?
Best,
Jark
On Sat, 18 Apr 2020 at 18:38, 人生若只如初见 <ca...@qq.com> wrote:
> 大佬好:
>
> 请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。
>
> 问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。
>
> 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create
> view as ...")却会报错。报错如下:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Unsupported query: CREATE VIEW rich_user_behavior AS
> SELECT U.user_id, U.item_id, U.behavior,
> CASE C.parent_category_id
> WHEN 1 THEN '服饰鞋包'
> WHEN 2 THEN '家装家饰'
> WHEN 3 THEN '家电'
> WHEN 4 THEN '美妆'
> WHEN 5 THEN '母婴'
> WHEN 6 THEN '3C数码'
> WHEN 7 THEN '运动户外'
> WHEN 8 THEN '食品'
> ELSE '其他'
> END AS category_name
> FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF
> U.proctime AS C
> ON U.category_id = C.sub_category_id
> at
> org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67)
> at
> org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown
> Source)
> at java.util.Optional.orElseThrow(Optional.java:290)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)
>
>
>
>
>
> 望解答,十分感谢!