You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/09/11 08:13:02 UTC

[inlong] branch master updated: [INLONG-8653][Sort] Fix the query sql for jdbc sink postgres dialect multiple table scenerios. (#8654)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a4216e3c9 [INLONG-8653][Sort] Fix the query sql for jdbc sink postgres dialect multiple table scenerios.  (#8654)
3a4216e3c9 is described below

commit 3a4216e3c98468379d02a2cc6fa029a56614cba4
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Mon Sep 11 16:12:56 2023 +0800

    [INLONG-8653][Sort] Fix the query sql for jdbc sink postgres dialect multiple table scenerios.  (#8654)
    
    Co-authored-by: Yizhou Yang <yi...@tencent.com>
---
 .../inlong/sort/jdbc/dialect/PostgresDialect.java  | 32 +++++++++-------------
 1 file changed, 13 insertions(+), 19 deletions(-)

diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java
index 9248f6da89..c90ef07ac4 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java
@@ -38,25 +38,19 @@ public class PostgresDialect extends AbstractJdbcDialect {
 
     private static final long serialVersionUID = 1L;
 
-    private static final String QUERY_PRIMARY_KEY_SQL = "SELECT\n" +
-            "\tstring_agg (DISTINCT t3.attname, ',') AS " + PRIMARY_KEY_COLUMN + ",\n" +
-            "    \tt4.tablename AS tableName\n" +
-            "FROM\n" +
-            "\tpg_constraint t1\n" +
-            "INNER JOIN pg_class t2 ON t1.conrelid = t2.oid\n" +
-            "INNER JOIN pg_attribute t3 ON t3.attrelid = t2.oid\n" +
-            "AND array_position (t1.conkey, t3.attnum) is not null\n" +
-            "INNER JOIN pg_tables t4 on t4.tablename = t2.relname\n" +
-            "INNER JOIN pg_index t5 ON t5.indrelid = t2.oid\n" +
-            "AND t3.attnum = ANY (t5.indkey)\n" +
-            "LEFT JOIN pg_description t6 on t6.objoid = t3.attrelid\n" +
-            "and t6.objsubid = t3.attnum\n" +
-            "WHERE\n" +
-            "\tt1.contype = 'p'\n" +
-            "AND length (t3.attname) > 0\n" +
-            "AND t2.oid = ?::regclass\n" +
-            "group by\n" +
-            "\tt4.tablename";
+    private static final String QUERY_PRIMARY_KEY_SQL = "SELECT\n"
+            + " string_agg(pg_attribute.attname, ',') AS pkColumn,\n"
+            + " pg_class.relname AS tableName\n"
+            + " FROM\n"
+            + " pg_index, pg_class, pg_attribute\n"
+            + " WHERE\n"
+            + " pg_class.oid = ?::regclass AND\n"
+            + " indrelid = pg_class.oid AND\n"
+            + " pg_attribute.attrelid = pg_class.oid AND\n"
+            + " pg_attribute.attnum = any(pg_index.indkey) AND\n"
+            + " indisprimary \n "
+            + " GROUP BY \n"
+            + " pg_class.relname, pg_index.indkey;";
 
     // Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs:
     // https://www.postgresql.org/docs/12/datatype-datetime.html