You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/27 06:19:53 UTC
[inlong] branch master updated: [INLONG-6271][Sort] Unhandled update_before data in canal json in multiple sink scene (#6272)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 612231f71 [INLONG-6271][Sort] Unhandled update_before data in canal json in multiple sink scene (#6272)
612231f71 is described below
commit 612231f71abba6b2d04aff1c8cff6c77eea434c5
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Thu Oct 27 14:19:47 2022 +0800
[INLONG-6271][Sort] Unhandled update_before data in canal json in multiple sink scene (#6272)
Co-authored-by: thesumery <15...@qq.com>
---
.../base/format/CanalJsonDynamicSchemaFormat.java | 45 ++++++++++++++++------
1 file changed, 34 insertions(+), 11 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
index a2601548f..48bde9524 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.base.format;
import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
@@ -119,29 +120,51 @@ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
public List<RowData> extractRowData(JsonNode data, RowType rowType) {
JsonNode opNode = data.get(OP_TYPE);
JsonNode dataNode = data.get(DATA);
+ JsonNode oldNode = data.get(OLD);
if (opNode == null || dataNode == null || !dataNode.isArray()) {
throw new IllegalArgumentException(String.format("Error opNode: %s, or dataNode: %s", opNode, dataNode));
}
String op = data.get(OP_TYPE).asText();
- RowKind rowKind;
+ JsonToRowDataConverter rowDataConverter = rowDataConverters.createConverter(rowType);
+ List<RowData> rowDataList = new ArrayList<>();
if (OP_INSERT.equals(op)) {
- rowKind = RowKind.INSERT;
+ for (JsonNode row : dataNode) {
+ RowData rowData = (RowData) rowDataConverter.convert(row);
+ rowData.setRowKind(RowKind.INSERT);
+ rowDataList.add(rowData);
+ }
} else if (OP_UPDATE.equals(op)) {
- rowKind = RowKind.UPDATE_AFTER;
+ for (int i = 0; i < dataNode.size(); i++) {
+ GenericRowData after = (GenericRowData) rowDataConverter.convert(dataNode.get(i));
+
+ if (oldNode != null) {
+ GenericRowData before = (GenericRowData) rowDataConverter.convert(oldNode.get(i));
+ for (int f = 0; f < rowType.getFieldCount(); f++) {
+ if (before.isNullAt(f) && oldNode.get(i).findValue(rowType.getFieldNames().get(f)) == null) {
+ // fields in "old" (before) means the fields are changed
+ // fields not in "old" (before) means the fields are not changed
+ // so we just copy the not changed fields into before
+ before.setField(f, after.getField(f));
+ }
+ }
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ rowDataList.add(before);
+ }
+
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ rowDataList.add(after);
+ }
} else if (OP_DELETE.equals(op)) {
- rowKind = RowKind.DELETE;
+ for (JsonNode row : dataNode) {
+ RowData rowData = (RowData) rowDataConverter.convert(row);
+ rowData.setRowKind(RowKind.DELETE);
+ rowDataList.add(rowData);
+ }
} else {
throw new IllegalArgumentException("Unsupported op_type: " + op);
}
- JsonToRowDataConverter rowDataConverter = rowDataConverters.createConverter(rowType);
- List<RowData> rowDataList = new ArrayList<>();
- for (JsonNode row : dataNode) {
- RowData rowData = (RowData) rowDataConverter.convert(row);
- rowData.setRowKind(rowKind);
- rowDataList.add(rowData);
- }
return rowDataList;
}