You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/03/03 11:44:51 UTC

[GitHub] [incubator-inlong] chantccc commented on a change in pull request #2848: [INLONG-2847][Sort] Support whole-database migration from debezium format to canal format

chantccc commented on a change in pull request #2848:
URL: https://github.com/apache/incubator-inlong/pull/2848#discussion_r818547940



##########
File path: inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationSchema.java
##########
@@ -55,59 +52,62 @@
  *
  * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
  */
-public class CanalJsonSerializationSchema implements SerializationSchema<RowData> {
+public class CanalJsonSerializationSchema implements SerializationSchema<Row> {
 
     private static final long serialVersionUID = 1L;
 
-    private static final StringData OP_INSERT = StringData.fromString("INSERT");
-    private static final StringData OP_DELETE = StringData.fromString("DELETE");
+    private static final String OP_INSERT = "INSERT";
+    private static final String OP_DELETE = "DELETE";
 
-    private transient GenericRowData reuse;
+    private transient Row reuse;
 
-    /** The serializer to serialize Canal JSON data. */
-    private final JsonRowDataSerializationSchema jsonSerializer;
+    private final JsonRowSerializationSchema jsonSerializer;
 
-    private final DataFormatConverters.RowConverter consumedRowConverter;
+    private final Map<Integer, ReadableMetadata> fieldIndexToMetadata;
 
-    private final DataFormatConverters.RowConverter physicalRowConverter;
+    private final boolean isMigrateAll;
 
-    private final Map<Integer, ReadableMetadata> fieldIndexToMetadata;
+    private final ObjectMapper objectMapper;
 
     public CanalJsonSerializationSchema(
             RowType physicalRowType,
             Map<Integer, ReadableMetadata> fieldIndexToMetadata,
-            DataFormatConverters.RowConverter consumedRowConverter,
-            DataFormatConverters.RowConverter physicalRowConverter,
-            TimestampFormat timestampFormat,
-            JsonOptions.MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral,
-            boolean encodeDecimalAsPlainNumber) {
-        jsonSerializer =
-                new JsonRowDataSerializationSchema(
-                        createJsonRowType(fromLogicalToDataType(physicalRowType), fieldIndexToMetadata.values()),
-                        timestampFormat,
-                        mapNullKeyMode,
-                        mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+            boolean isMigrateAll
+    ) {
+        this.isMigrateAll = isMigrateAll;
+
+        if (isMigrateAll) {
+            this.objectMapper = new ObjectMapper();
+        } else {
+            this.objectMapper = null;
+        }
+
+        RowTypeInfo rowTypeInfo = createJsonRowType(fromLogicalToDataType(physicalRowType),
+                fieldIndexToMetadata.values(), isMigrateAll);
+        jsonSerializer = JsonRowSerializationSchema.builder().withTypeInfo(rowTypeInfo).build();
 
         this.fieldIndexToMetadata = fieldIndexToMetadata;
-        this.consumedRowConverter = consumedRowConverter;
-        this.physicalRowConverter = physicalRowConverter;
     }
 
     @Override
     public void open(InitializationContext context) {
-        reuse = new GenericRowData(2 + fieldIndexToMetadata.size());
+        reuse = new Row(2 + fieldIndexToMetadata.size());
     }
 
     @Override
-    public byte[] serialize(RowData row) {
+    public byte[] serialize(Row row) {
         try {
             MysqlBinLogData mysqlBinLogData = getMysqlBinLongData(row);
 
-            ArrayData arrayData = new GenericArrayData(new RowData[] {mysqlBinLogData.getPhysicalData()});
+            Object[] arrayData = new Object[1];
+            if (isMigrateAll) {
+                String mapStr = mysqlBinLogData.getPhysicalData().getFieldAs(0);

Review comment:
       why use getFieldAs instead of getField?

##########
File path: inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java
##########
@@ -195,7 +210,10 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
     }
 
     private void emitRow(GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
-        final int physicalArity = physicalRow.getArity();
+        int physicalArity = physicalRow.getArity();
+        if (isMigrateAll) {
+            physicalArity -= 1;

Review comment:
       if isMigrateAll, physical arity should always be 0. because all fields are treated as metadata fields including the `MYSQL_METADATA_DATA` field
   
   what's the point of physicalArity -= 1 ?

##########
File path: inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java
##########
@@ -153,8 +160,16 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
                 payload = row;
             }
 
-            GenericRowData before = (GenericRowData) payload.getField(0);
-            GenericRowData after = (GenericRowData) payload.getField(1);
+            GenericRowData before;
+            GenericRowData after;
+            if (isMigrateAll) {
+                before = GenericRowData.of(payload.getField(BEFORE_POS));
+                after = GenericRowData.of(payload.getField(AFTER_POS));
+            } else {
+                before = (GenericRowData) payload.getField(BEFORE_POS);
+                after = (GenericRowData) payload.getField(AFTER_POS);
+            }
+
             String op = payload.getField(2).toString();

Review comment:
       private static final int OP_POS = 2;

##########
File path: inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationSchema.java
##########
@@ -117,13 +117,15 @@ public void open(InitializationContext context) {
                 index++;
             }
 
+            reuse.setKind(row.getKind());

Review comment:
       RowKind in reuse is never used, so it's not necessary to setKind here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org