You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/27 06:19:53 UTC

[inlong] branch master updated: [INLONG-6271][Sort] Unhandled update_before data in canal json in multiple sink scene (#6272)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 612231f71 [INLONG-6271][Sort] Unhandled update_before data in canal json in multiple sink scene (#6272)
612231f71 is described below

commit 612231f71abba6b2d04aff1c8cff6c77eea434c5
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Thu Oct 27 14:19:47 2022 +0800

    [INLONG-6271][Sort] Unhandled update_before data in canal json in multiple sink scene (#6272)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../base/format/CanalJsonDynamicSchemaFormat.java  | 45 ++++++++++++++++------
 1 file changed, 34 insertions(+), 11 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
index a2601548f..48bde9524 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.base.format;
 
 import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
@@ -119,29 +120,51 @@ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
     public List<RowData> extractRowData(JsonNode data, RowType rowType) {
         JsonNode opNode = data.get(OP_TYPE);
         JsonNode dataNode = data.get(DATA);
+        JsonNode oldNode = data.get(OLD);
         if (opNode == null || dataNode == null || !dataNode.isArray()) {
             throw new IllegalArgumentException(String.format("Error opNode: %s, or dataNode: %s", opNode, dataNode));
         }
 
         String op = data.get(OP_TYPE).asText();
-        RowKind rowKind;
+        JsonToRowDataConverter rowDataConverter = rowDataConverters.createConverter(rowType);
+        List<RowData> rowDataList = new ArrayList<>();
         if (OP_INSERT.equals(op)) {
-            rowKind = RowKind.INSERT;
+            for (JsonNode row : dataNode) {
+                RowData rowData = (RowData) rowDataConverter.convert(row);
+                rowData.setRowKind(RowKind.INSERT);
+                rowDataList.add(rowData);
+            }
         } else if (OP_UPDATE.equals(op)) {
-            rowKind = RowKind.UPDATE_AFTER;
+            for (int i = 0; i < dataNode.size(); i++) {
+                GenericRowData after = (GenericRowData) rowDataConverter.convert(dataNode.get(i));
+
+                if (oldNode != null) {
+                    GenericRowData before = (GenericRowData) rowDataConverter.convert(oldNode.get(i));
+                    for (int f = 0; f < rowType.getFieldCount(); f++) {
+                        if (before.isNullAt(f) && oldNode.get(i).findValue(rowType.getFieldNames().get(f)) == null) {
+                            // fields in "old" (before) means the fields are changed
+                            // fields not in "old" (before) means the fields are not changed
+                            // so we just copy the not changed fields into before
+                            before.setField(f, after.getField(f));
+                        }
+                    }
+                    before.setRowKind(RowKind.UPDATE_BEFORE);
+                    rowDataList.add(before);
+                }
+
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                rowDataList.add(after);
+            }
         } else if (OP_DELETE.equals(op)) {
-            rowKind = RowKind.DELETE;
+            for (JsonNode row : dataNode) {
+                RowData rowData = (RowData) rowDataConverter.convert(row);
+                rowData.setRowKind(RowKind.DELETE);
+                rowDataList.add(rowData);
+            }
         } else {
             throw new IllegalArgumentException("Unsupported op_type: " + op);
         }
-        JsonToRowDataConverter rowDataConverter = rowDataConverters.createConverter(rowType);
 
-        List<RowData> rowDataList = new ArrayList<>();
-        for (JsonNode row : dataNode) {
-            RowData rowData = (RowData) rowDataConverter.convert(row);
-            rowData.setRowKind(rowKind);
-            rowDataList.add(rowData);
-        }
         return rowDataList;
     }