You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/09/11 08:13:02 UTC
[inlong] branch master updated: [INLONG-8653][Sort] Fix the query sql for jdbc sink postgres dialect multiple table scenerios. (#8654)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3a4216e3c9 [INLONG-8653][Sort] Fix the query sql for jdbc sink postgres dialect multiple table scenerios. (#8654)
3a4216e3c9 is described below
commit 3a4216e3c98468379d02a2cc6fa029a56614cba4
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Mon Sep 11 16:12:56 2023 +0800
[INLONG-8653][Sort] Fix the query sql for jdbc sink postgres dialect multiple table scenerios. (#8654)
Co-authored-by: Yizhou Yang <yi...@tencent.com>
---
.../inlong/sort/jdbc/dialect/PostgresDialect.java | 32 +++++++++-------------
1 file changed, 13 insertions(+), 19 deletions(-)
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java
index 9248f6da89..c90ef07ac4 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java
@@ -38,25 +38,19 @@ public class PostgresDialect extends AbstractJdbcDialect {
private static final long serialVersionUID = 1L;
- private static final String QUERY_PRIMARY_KEY_SQL = "SELECT\n" +
- "\tstring_agg (DISTINCT t3.attname, ',') AS " + PRIMARY_KEY_COLUMN + ",\n" +
- " \tt4.tablename AS tableName\n" +
- "FROM\n" +
- "\tpg_constraint t1\n" +
- "INNER JOIN pg_class t2 ON t1.conrelid = t2.oid\n" +
- "INNER JOIN pg_attribute t3 ON t3.attrelid = t2.oid\n" +
- "AND array_position (t1.conkey, t3.attnum) is not null\n" +
- "INNER JOIN pg_tables t4 on t4.tablename = t2.relname\n" +
- "INNER JOIN pg_index t5 ON t5.indrelid = t2.oid\n" +
- "AND t3.attnum = ANY (t5.indkey)\n" +
- "LEFT JOIN pg_description t6 on t6.objoid = t3.attrelid\n" +
- "and t6.objsubid = t3.attnum\n" +
- "WHERE\n" +
- "\tt1.contype = 'p'\n" +
- "AND length (t3.attname) > 0\n" +
- "AND t2.oid = ?::regclass\n" +
- "group by\n" +
- "\tt4.tablename";
+ private static final String QUERY_PRIMARY_KEY_SQL = "SELECT\n"
+ + " string_agg(pg_attribute.attname, ',') AS pkColumn,\n"
+ + " pg_class.relname AS tableName\n"
+ + " FROM\n"
+ + " pg_index, pg_class, pg_attribute\n"
+ + " WHERE\n"
+ + " pg_class.oid = ?::regclass AND\n"
+ + " indrelid = pg_class.oid AND\n"
+ + " pg_attribute.attrelid = pg_class.oid AND\n"
+ + " pg_attribute.attnum = any(pg_index.indkey) AND\n"
+ + " indisprimary \n "
+ + " GROUP BY \n"
+ + " pg_class.relname, pg_index.indkey;";
// Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs:
// https://www.postgresql.org/docs/12/datatype-datetime.html