You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fangliang Liu (Jira)" <ji...@apache.org> on 2022/04/17 11:45:00 UTC
[jira] [Updated] (FLINK-27275) Support null value not update in flink-connector-jdbc
[ https://issues.apache.org/jira/browse/FLINK-27275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fangliang Liu updated FLINK-27275:
----------------------------------
Description:
I write data to mysql according to the following statement.
{code:java}
CREATE TABLE IF NOT EXISTS t_source (
`user_id` bigint,
`A` string,
`B` string,
`C` string,
`flag` varchar(256)
)WITH (
'connector' = 'kafka',
'format' = 'canal-json',
'scan.startup.mode' = 'latest-offset',
... ...
);
CREATE TABLE IF NOT EXISTS t_sink (
`user_id` bigint,
`A` string,
`B` string,
`C` string,
`flag` varchar(256),
PRIMARY KEY (`user_id`) NOT ENFORCED
)WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
'table-name' = 'user',
... ...
);
INSERT INTO t_sink(
`user_id`,
`A`,
`B`,
`C`,
`flag`
) SELECT `user_id`, `A`, `B`, `C`, `flag` FROM t_source;
{code}
was:
The follow DDL
{code:java}
CREATE TABLE IF NOT EXISTS `db`.`tablea` (
`user_id` bigint,
`A` string,
`B` string,
`C` string,
`flag` varchar(256),
PRIMARY KEY (`user_id`) NOT ENFORCED
)WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
'table-name' = 'user',
'username'='root',
'password'='root',
'sink.buffer-flush.interval'='1s',
'sink.buffer-flush.max-rows'='50',
'sink.parallelism'='2'
); {code}
> Support null value not update in flink-connector-jdbc
> -----------------------------------------------------
>
> Key: FLINK-27275
> URL: https://issues.apache.org/jira/browse/FLINK-27275
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / JDBC
> Affects Versions: 1.14.3
> Reporter: Fangliang Liu
> Priority: Major
>
> I write data to mysql according to the following statement.
> {code:java}
> CREATE TABLE IF NOT EXISTS t_source (
> `user_id` bigint,
> `A` string,
> `B` string,
> `C` string,
> `flag` varchar(256)
> )WITH (
> 'connector' = 'kafka',
> 'format' = 'canal-json',
> 'scan.startup.mode' = 'latest-offset',
> ... ...
> );
> CREATE TABLE IF NOT EXISTS t_sink (
> `user_id` bigint,
> `A` string,
> `B` string,
> `C` string,
> `flag` varchar(256),
> PRIMARY KEY (`user_id`) NOT ENFORCED
> )WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
> 'table-name' = 'user',
> ... ...
> );
> INSERT INTO t_sink(
> `user_id`,
> `A`,
> `B`,
> `C`,
> `flag`
> ) SELECT `user_id`, `A`, `B`, `C`, `flag` FROM t_source;
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)