You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/08 07:02:53 UTC

[inlong] branch master updated: [INLONG-7543][Sort] Fix PostgreSQL connector output two data with the same UPDATE operation (#7544)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 459be730e [INLONG-7543][Sort] Fix PostgreSQL connector output two data with the same UPDATE operation (#7544)
459be730e is described below

commit 459be730e2f4f0062ab743e3bb0e7c11498df90b
Author: Liao Rui <li...@users.noreply.github.com>
AuthorDate: Wed Mar 8 15:02:46 2023 +0800

    [INLONG-7543][Sort] Fix PostgreSQL connector output two data with the same UPDATE operation (#7544)
    
    Co-authored-by: ryanrliao <ry...@tencent.com>
---
 .../postgres/table/PostgreSQLReadableMetaData.java | 90 +++++++++++++++-------
 1 file changed, 64 insertions(+), 26 deletions(-)

diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
index b2dacf9ce..e035c4a93 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
@@ -165,7 +165,7 @@ public enum PostgreSQLReadableMetaData {
                     .name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY)).sqlType(getSqlType(tableSchema))
                     .pkNames(getPkNames(tableSchema)).build();
             DebeziumJson debeziumJson = DebeziumJson.builder().after(field).source(source)
-                    .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record))
+                    .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
                     .tableChange(tableSchema).build();
 
             try {
@@ -373,9 +373,9 @@ public enum PostgreSQLReadableMetaData {
     /**
      * Generate a canal json message
      *
-     * @param record
-     * @param tableSchema
-     * @param rowData
+     * @param record source record
+     * @param tableSchema table schema
+     * @param rowData row data with canal json or debezium json
      * @return
      */
     private static StringData getCanalData(SourceRecord record, TableChange tableSchema, GenericRowData rowData) {
@@ -398,9 +398,18 @@ public enum PostgreSQLReadableMetaData {
         List<Map<String, Object>> dataList = new ArrayList<>();
         dataList.add(field);
 
-        CanalJson canalJson = CanalJson.builder().data(dataList).database(databaseName).schema(schemaName).sql("")
-                .es(opTs).isDdl(false).pkNames(getPkNames(tableSchema)).table(tableName).ts(ts).type(getOpType(record))
-                .sqlType(getSqlType(tableSchema)).build();
+        CanalJson canalJson = CanalJson.builder()
+                .data(dataList)
+                .database(databaseName)
+                .schema(schemaName)
+                .sql("")
+                .es(opTs).isDdl(false)
+                .pkNames(getPkNames(tableSchema))
+                .table(tableName)
+                .ts(ts)
+                .type(getCanalOpType(rowData))
+                .sqlType(getSqlType(tableSchema))
+                .build();
         try {
             ObjectMapper objectMapper = new ObjectMapper();
             return StringData.fromString(objectMapper.writeValueAsString(canalJson));
@@ -426,8 +435,8 @@ public enum PostgreSQLReadableMetaData {
     /**
      * convert debezium operation to canal-json operation type
      *
-     * @param record
-     * @return
+     * @param record source record
+     * @return source table DML operation type
      */
     private static String getOpType(SourceRecord record) {
         String opType;
@@ -445,8 +454,8 @@ public enum PostgreSQLReadableMetaData {
     /**
      * get primary key names
      *
-     * @param tableSchema
-     * @return
+     * @param tableSchema table schema
+     * @return primary key column names
      */
     private static List<String> getPkNames(@Nullable TableChange tableSchema) {
         if (tableSchema == null) {
@@ -458,8 +467,8 @@ public enum PostgreSQLReadableMetaData {
     /**
      * get a map about column name and type
      *
-     * @param tableSchema
-     * @return
+     * @param tableSchema table schema
+     * @return map of field name and field type
      */
     public static Map<String, Integer> getSqlType(@Nullable TableChange tableSchema) {
         if (tableSchema == null) {
@@ -473,21 +482,50 @@ public enum PostgreSQLReadableMetaData {
         return postgresType;
     }
 
+    /**
+     * convert canal operation to canal-json operation type
+     *
+     * @param record raw data with canal json format
+     * @return source table DML operation type
+     */
+    private static String getCanalOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "DELETE";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "INSERT";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
     /**
      * convert debezium operation to debezium-json operation type
      *
-     * @param record
-     * @return
+     * @param record raw data with debezium json format
+     * @return source table DML operation type
      */
-    private static String getDebeziumOpType(SourceRecord record) {
+    private static String getDebeziumOpType(GenericRowData record) {
         String opType;
-        final Envelope.Operation op = Envelope.operationFor(record);
-        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
-            opType = "c";
-        } else if (op == Envelope.Operation.DELETE) {
-            opType = "d";
-        } else {
-            opType = "u";
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "d";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "c";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
         }
         return opType;
     }
@@ -495,9 +533,9 @@ public enum PostgreSQLReadableMetaData {
     /**
      * get meta info from debezium-json data stream
      *
-     * @param record
-     * @param tableNameKey
-     * @return
+     * @param record source record
+     * @param tableNameKey key in the source record
+     * @return value of the key in the source record
      */
     private static String getMetaData(SourceRecord record, String tableNameKey) {
         Struct messageStruct = (Struct) record.value();