You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "gaoling ma (Jira)" <ji...@apache.org> on 2020/07/17 11:49:00 UTC

[jira] [Updated] (FLINK-18626) the result of aggregate SQL on streaming cannot write to upsert table sink

     [ https://issues.apache.org/jira/browse/FLINK-18626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

gaoling ma updated FLINK-18626:
-------------------------------
    Issue Type: Bug  (was: Improvement)

> the result of aggregate SQL on streaming cannot write to upsert table sink 
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-18626
>                 URL: https://issues.apache.org/jira/browse/FLINK-18626
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC, Table SQL / API
>    Affects Versions: 1.11.0
>            Reporter: gaoling ma
>            Priority: Major
>
> {code:java}
> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> bsEnv.setParallelism(1);
> ......
> bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
> 		"    `area_code`	VARCHAR,\n" +
>                 "    `stat_date`	DATE,\n" +
>                 "    `index`		BIGINT,\n" +
>                 "    PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
>                 ") WITH (\n" +
>                 "  'connector'  = 'jdbc',\n" +
>                 "  'url'        = 'jdbc:mysql://***/laowufp_data_test',\n" +
>                 "  'table-name' = 'aaa',\n" +
>                 "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
>                 "  'username'   = '***',\n" +
>                 "  'password'   = '***'\n" +
>                 ")");
> 		
> 		bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code, CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' GROUP BY area_code");
> {code}
> When I write the aggregate SQL results into upsert stream JDBC table sink, the program automatically exits with no hint.
> The aggregate results suppose to be a restract stream, but another question is how to change the restract stream into upsert stream. Or there is a better way to continuous update the aggregate SQL results into JDBC table. Your comment is appreciated. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)