You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "gaoling ma (Jira)" <ji...@apache.org> on 2020/07/18 15:20:00 UTC

[jira] [Updated] (FLINK-18626) The result of aggregate SQL on MySQL cannot write to upsert table sink in

     [ https://issues.apache.org/jira/browse/FLINK-18626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

gaoling ma updated FLINK-18626:
-------------------------------
    Summary: The result of aggregate SQL on MySQL cannot write to upsert table sink in   (was: The result of aggregate SQL on MySQL cannot write to upsert table sink )

> The result of aggregate SQL on MySQL cannot write to upsert table sink in 
> --------------------------------------------------------------------------
>
>                 Key: FLINK-18626
>                 URL: https://issues.apache.org/jira/browse/FLINK-18626
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC, Table SQL / API
>    Affects Versions: 1.11.0
>            Reporter: gaoling ma
>            Priority: Major
>
> {code:java}
> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> bsEnv.setParallelism(1);
> bsTableEnv.executeSql("CREATE TABLE bbb (\n" +
> 				......
> 				") WITH (\n" +
>                 "  'connector'  = 'jdbc',\n" +
>                 "  'url'        = 'jdbc:mysql://***/***',\n" +
>                 "  'table-name' = 'bbb',\n" +
>                 "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
>                 "  'username'   = '***',\n" +
>                 "  'password'   = '***'" +
> 				")");
> bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
> 		"    `area_code`	VARCHAR,\n" +
>                 "    `stat_date`	        DATE,\n" +
>                 "    `index`		BIGINT,\n" +
>                 "    PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
>                 ") WITH (\n" +
>                 "  'connector'  = 'jdbc',\n" +
>                 "  'url'        = 'jdbc:mysql://***/***',\n" +
>                 "  'table-name' = 'aaa',\n" +
>                 "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
>                 "  'username'   = '***',\n" +
>                 "  'password'   = '***'\n" +
>                 ")");
> 		
> 		bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code, CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' GROUP BY area_code");
> //		Table table = bsTableEnv.sqlQuery("SELECT area_code, CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' GROUP BY area_code");
> //		DataStream<Tuple2<Boolean, Row>> retractStream = 
> //				bsTableEnv.toRetractStream(table, Row.class);
> //		retractStream.print();
> //		bsEnv.execute();
> {code}
> When I write the aggregate SQL results of mysql into upsert stream JDBC table sink, the program automatically exits with no hint. The result table aaa is also empty which it should not be.
> When I use commented code above which converts the aggregate SQL result into retractStream, the printed results is normally.
> When I replace data source into kafka and not use commented code above, the dynamic results is written into table aaa successfully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)