You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2022/04/22 22:39:00 UTC
[jira] [Updated] (FLINK-26595) Improve the PostgresDialect method for getting upsert statements.
[ https://issues.apache.org/jira/browse/FLINK-26595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-26595:
-----------------------------------
Labels: pull-request-available stale-assigned (was: pull-request-available)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone else may work on it.
> Improve the PostgresDialect method for getting upsert statements.
> -----------------------------------------------------------------
>
> Key: FLINK-26595
> URL: https://issues.apache.org/jira/browse/FLINK-26595
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.13.1
> Reporter: wuguihu
> Assignee: wuguihu
> Priority: Major
> Labels: pull-request-available, stale-assigned
> Attachments: image-20220311125613545.png, image-20220311130744606.png, image-20220311141815540.png, image-20220315001550269.png
>
>
> I'm trying to use Flink CDC to synchronize mysql data to matrixDB in real time.
> But I encountered an error.
> The error message is as follows:
> {quote}CIRCULAR REFERENCE:java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO user_1(id, name, address, phone_number, email) VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, email=EXCLUDED.email was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch.
> {quote}
> This exception is caused by the getUpsertStatement method of PostgresDialect.
> There is something wrong with the upsert statement.
> In the Update statement, uniqueKey-related columns should be deleted;
>
> I did the following experiment to test my modifications.
> At the same time, I recompiled and packaged flink-connector-JDBC. Using the modified flink-connector-JDBC, my program no longer reported errors.
> {code:sql}
> -- 1、Create a table for maxtrixDB
> CREATE TABLE user_1 (
> id int,
> name VARCHAR(255) NOT NULL DEFAULT 'flink',
> address VARCHAR(1024),
> phone_number VARCHAR(512),
> email VARCHAR(255),
> UNIQUE(id)
> );
> -- 2、Insert a record.
> INSERT INTO user_1(id, name, address, phone_number, email)
> VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com')
> ON CONFLICT (id)
> DO UPDATE SET
> id=EXCLUDED.id,
> name=EXCLUDED.name,
> address=EXCLUDED.address,
> phone_number=EXCLUDED.phone_number,
> email=EXCLUDED.email;
> -- 3、Executing the above insert statement results in the following error.
> ERROR: modification of distribution columns in OnConflictUpdate is not supported
> -- 4、If the value is changed to the following statement, the command is executed successfully.
> INSERT INTO user_1(id, name, address, phone_number, email)
> VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com')
> ON CONFLICT (id)
> DO UPDATE SET
> name=EXCLUDED.name,
> address=EXCLUDED.address,
> phone_number=EXCLUDED.phone_number,
> email=EXCLUDED.email;
> {code}
>
>
> The PostgresDialect class handles upsert statements as follows:
> {code:java}
> // package org.apache.flink.connector.jdbc.dialect.psql
> public Optional<String> getUpsertStatement(
> String tableName, String[] fieldNames, String[] uniqueKeyFields) {
> String uniqueColumns =
> Arrays.stream(uniqueKeyFields)
> .map(this::quoteIdentifier)
> .collect(Collectors.joining(", "));
> String updateClause =
> Arrays.stream(fieldNames)
> .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
> .collect(Collectors.joining(", "));
> return Optional.of(
> getInsertIntoStatement(tableName, fieldNames)
> + " ON CONFLICT ("
> + uniqueColumns
> + ")"
> + " DO UPDATE SET "
> + updateClause);
> }
> {code}
>
>
> To fix this problem, make the following changes to PostgresDialect:
> {code:java}
> // package org.apache.flink.connector.jdbc.dialect.psql
> public Optional<String> getUpsertStatement(
> String tableName, String[] fieldNames, String[] uniqueKeyFields) {
> String uniqueColumns =
> Arrays.stream(uniqueKeyFields)
> .map(this::quoteIdentifier)
> .collect(Collectors.joining(", "));
> List tempList = Arrays.asList(uniqueKeyFields);
> String updateClause =
> Arrays.stream(fieldNames)
> .filter(f->!tempList.contains(f))
> .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
> .collect(Collectors.joining(", "));
> return Optional.of(
> getInsertIntoStatement(tableName, fieldNames)
> + " ON CONFLICT ("
> + uniqueColumns
> + ")"
> + " DO UPDATE SET "
> + updateClause);
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)