You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fangliang Liu (Jira)" <ji...@apache.org> on 2022/04/17 11:45:00 UTC

[jira] [Updated] (FLINK-27275) Support null value not update in flink-connector-jdbc

     [ https://issues.apache.org/jira/browse/FLINK-27275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Fangliang Liu updated FLINK-27275:
----------------------------------
    Description: 
I write data to mysql according to the following statement.
{code:java}
 CREATE TABLE IF NOT EXISTS t_source (
    `user_id`  bigint,
    `A` string,
    `B` string,
    `C` string,
    `flag` varchar(256)
)WITH (
    'connector' = 'kafka',
    'format' = 'canal-json',
    'scan.startup.mode' = 'latest-offset',
    ... ...
);

CREATE TABLE IF NOT EXISTS t_sink (
    `user_id`  bigint,
    `A` string,
    `B` string,
    `C` string,
    `flag` varchar(256),
    PRIMARY KEY (`user_id`) NOT ENFORCED
)WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
    'table-name' = 'user',
    ... ...
 ); 

INSERT INTO t_sink(
    `user_id`,
    `A`,
    `B`,
    `C`,
    `flag`
) SELECT  `user_id`, `A`, `B`, `C`, `flag` FROM t_source;

{code}
 

  was:
The follow DDL
{code:java}
CREATE TABLE IF NOT EXISTS `db`.`tablea` (
    `user_id`  bigint,
    `A` string,
    `B` string,
    `C` string,
    `flag` varchar(256),
    PRIMARY KEY (`user_id`) NOT ENFORCED
)WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
    'table-name' = 'user',
    'username'='root',
    'password'='root',
    'sink.buffer-flush.interval'='1s',
    'sink.buffer-flush.max-rows'='50',
    'sink.parallelism'='2'
); {code}
 


> Support null value not update in flink-connector-jdbc
> -----------------------------------------------------
>
>                 Key: FLINK-27275
>                 URL: https://issues.apache.org/jira/browse/FLINK-27275
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / JDBC
>    Affects Versions: 1.14.3
>            Reporter: Fangliang Liu
>            Priority: Major
>
> I write data to mysql according to the following statement.
> {code:java}
>  CREATE TABLE IF NOT EXISTS t_source (
>     `user_id`  bigint,
>     `A` string,
>     `B` string,
>     `C` string,
>     `flag` varchar(256)
> )WITH (
>     'connector' = 'kafka',
>     'format' = 'canal-json',
>     'scan.startup.mode' = 'latest-offset',
>     ... ...
> );
> CREATE TABLE IF NOT EXISTS t_sink (
>     `user_id`  bigint,
>     `A` string,
>     `B` string,
>     `C` string,
>     `flag` varchar(256),
>     PRIMARY KEY (`user_id`) NOT ENFORCED
> )WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
>     'table-name' = 'user',
>     ... ...
>  ); 
> INSERT INTO t_sink(
>     `user_id`,
>     `A`,
>     `B`,
>     `C`,
>     `flag`
> ) SELECT  `user_id`, `A`, `B`, `C`, `flag` FROM t_source;
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)