You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 刘海 <li...@163.com> on 2021/01/21 09:52:06 UTC

使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

HI!
这边做测试时遇到一个问题:
在流应用中使用了一个mysql  jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表:
bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" +
") WITH (" +
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," +
"'table-name' = 'tm_dealers'," +
"'driver' = 'com.mysql.cj.jdbc.Driver'," +
"'username' = 'root'," +
"'password' = 'Cdh2020:1'," +
"'lookup.cache.max-rows' = '500',"+
"'lookup.cache.ttl' = '1800s',"+
"'sink.buffer-flush.interval' = '60s'"+
")");


我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误:
job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED instead. Aborting checkpoint.


进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗?


感谢大佬指导一下,拜谢!
| |
刘海
|
|
liuhai35@163.com
|
签名由网易邮箱大师定制

Re:Re:Re:使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

Posted by anonnius <an...@126.com>.
重新格式下, 不好意思
hi: 今天又试了下, 我这边出现问题是因为: join时使用的语法问题 照成的
应该使用这种语法
-- temporal join the JDBC table as a dimension table
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;


而不是
SELECT * FROM myTopic a 
LEFTJOIN MyUserTable b
ON a.id = b.id
--------------
hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的

应该使用这种语法
-- temporal join the JDBC table as a dimension tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id;
而不是
SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id
文档连接在这里, https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache

希望对你有帮助

















在 2021-04-19 18:54:38,"anonnius" <an...@126.com> 写道:
>hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的
>
>应该使用这种语法
>-- temporal join the JDBC table as a dimension tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id;
>而不是
>SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id
>文档连接在这里, https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache
>
>希望对你有帮助
>
>
>
>
>
>
>
>
>在 2021-04-14 16:47:04,"anonnius" <an...@126.com> 写道:
>>+1, 目前也遇到了
>>在 2021-01-21 17:52:06,"刘海" <li...@163.com> 写道:
>>>HI!
>>>这边做测试时遇到一个问题:
>>>在流应用中使用了一个mysql  jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表:
>>>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" +
>>>") WITH (" +
>>>"'connector' = 'jdbc'," +
>>>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," +
>>>"'table-name' = 'tm_dealers'," +
>>>"'driver' = 'com.mysql.cj.jdbc.Driver'," +
>>>"'username' = 'root'," +
>>>"'password' = 'Cdh2020:1'," +
>>>"'lookup.cache.max-rows' = '500',"+
>>>"'lookup.cache.ttl' = '1800s',"+
>>>"'sink.buffer-flush.interval' = '60s'"+
>>>")");
>>>
>>>
>>>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误:
>>>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED instead. Aborting checkpoint.
>>>
>>>
>>>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗?
>>>
>>>
>>>感谢大佬指导一下,拜谢!
>>>| |
>>>刘海
>>>|
>>>|
>>>liuhai35@163.com
>>>|
>>>签名由网易邮箱大师定制

Re:Re:使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

Posted by anonnius <an...@126.com>.
hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的

应该使用这种语法
-- temporal join the JDBC table as a dimension tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id;
而不是
SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id
文档连接在这里, https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache

希望对你有帮助








在 2021-04-14 16:47:04,"anonnius" <an...@126.com> 写道:
>+1, 目前也遇到了
>在 2021-01-21 17:52:06,"刘海" <li...@163.com> 写道:
>>HI!
>>这边做测试时遇到一个问题:
>>在流应用中使用了一个mysql  jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表:
>>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" +
>>") WITH (" +
>>"'connector' = 'jdbc'," +
>>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," +
>>"'table-name' = 'tm_dealers'," +
>>"'driver' = 'com.mysql.cj.jdbc.Driver'," +
>>"'username' = 'root'," +
>>"'password' = 'Cdh2020:1'," +
>>"'lookup.cache.max-rows' = '500',"+
>>"'lookup.cache.ttl' = '1800s',"+
>>"'sink.buffer-flush.interval' = '60s'"+
>>")");
>>
>>
>>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误:
>>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED instead. Aborting checkpoint.
>>
>>
>>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗?
>>
>>
>>感谢大佬指导一下,拜谢!
>>| |
>>刘海
>>|
>>|
>>liuhai35@163.com
>>|
>>签名由网易邮箱大师定制

Re:使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

Posted by anonnius <an...@126.com>.
+1, 目前也遇到了
在 2021-01-21 17:52:06,"刘海" <li...@163.com> 写道:
>HI!
>这边做测试时遇到一个问题:
>在流应用中使用了一个mysql  jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表:
>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" +
>") WITH (" +
>"'connector' = 'jdbc'," +
>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," +
>"'table-name' = 'tm_dealers'," +
>"'driver' = 'com.mysql.cj.jdbc.Driver'," +
>"'username' = 'root'," +
>"'password' = 'Cdh2020:1'," +
>"'lookup.cache.max-rows' = '500',"+
>"'lookup.cache.ttl' = '1800s',"+
>"'sink.buffer-flush.interval' = '60s'"+
>")");
>
>
>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误:
>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED instead. Aborting checkpoint.
>
>
>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗?
>
>
>感谢大佬指导一下,拜谢!
>| |
>刘海
>|
>|
>liuhai35@163.com
>|
>签名由网易邮箱大师定制

Re: 使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

Posted by JasonLee <17...@163.com>.
hi

你需要使用 Temporal Table Join 的语法,具体操作可以参考官网
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html



-----
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/