You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "gaoling ma (Jira)" <ji...@apache.org> on 2020/07/21 01:19:00 UTC

[jira] [Comment Edited] (FLINK-18626) The result of aggregate SQL on MySQL cannot write to upsert table sink inStreamingMode

    [ https://issues.apache.org/jira/browse/FLINK-18626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161642#comment-17161642 ] 

gaoling ma edited comment on FLINK-18626 at 7/21/20, 1:18 AM:
--------------------------------------------------------------

When I added System.in.read() after the executeSql method to block the main thread, and the result turns to be right. Thanks for your comment.
But it seems to be a little strange. Why bsTableEnv.executeSql  cannot behave like bsEnv.execute() in streamingMode under mini cluster mode. The bsTableEnv.execute method also throws an exception : java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.


was (Author: magaoling):
When I added System.in.read() after the executeSql method to block the main thread, and the result turns to be right. Thanks for your comment.

> The result of aggregate SQL on MySQL cannot write to upsert table sink inStreamingMode 
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-18626
>                 URL: https://issues.apache.org/jira/browse/FLINK-18626
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC, Table SQL / API
>    Affects Versions: 1.11.0
>            Reporter: gaoling ma
>            Priority: Major
>
> {code:java}
> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> bsEnv.setParallelism(1);
> bsTableEnv.executeSql("CREATE TABLE bbb (\n" +
> 				......
> 				") WITH (\n" +
>                 "  'connector'  = 'jdbc',\n" +
>                 "  'url'        = 'jdbc:mysql://***/***',\n" +
>                 "  'table-name' = 'bbb',\n" +
>                 "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
>                 "  'username'   = '***',\n" +
>                 "  'password'   = '***'" +
> 				")");
> bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
> 		"    `area_code`	VARCHAR,\n" +
>                 "    `stat_date`	        DATE,\n" +
>                 "    `index`		BIGINT,\n" +
>                 "    PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
>                 ") WITH (\n" +
>                 "  'connector'  = 'jdbc',\n" +
>                 "  'url'        = 'jdbc:mysql://***/***',\n" +
>                 "  'table-name' = 'aaa',\n" +
>                 "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
>                 "  'username'   = '***',\n" +
>                 "  'password'   = '***'\n" +
>                 ")");
> 		
> 		bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code, CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' GROUP BY area_code");
> //		Table table = bsTableEnv.sqlQuery("SELECT area_code, CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' GROUP BY area_code");
> //		DataStream<Tuple2<Boolean, Row>> retractStream = 
> //				bsTableEnv.toRetractStream(table, Row.class);
> //		retractStream.print();
> //		bsEnv.execute();
> {code}
> When I write the aggregate SQL results of mysql into upsert stream JDBC table sink, the program automatically exits with no hint. The result table aaa is also empty which it should not be.
> When I use commented code above which converts the aggregate SQL result into retractStream, the printed results is normally.
> When I replace data source into kafka and not use commented code above, the dynamic results is written into table aaa successfully.
> When I replace inStreamingMode() into inBatchMode() and not use commented code above, it also works.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)