You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/03/03 11:53:12 UTC

[GitHub] [incubator-inlong] KevinWen007 commented on a change in pull request #2848: [INLONG-2847][Sort] Support whole-database migration from debezium format to canal format

KevinWen007 commented on a change in pull request #2848:
URL: https://github.com/apache/incubator-inlong/pull/2848#discussion_r818582350



##########
File path: inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationSchema.java
##########
@@ -55,59 +52,62 @@
  *
  * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
  */
-public class CanalJsonSerializationSchema implements SerializationSchema<RowData> {
+public class CanalJsonSerializationSchema implements SerializationSchema<Row> {
 
     private static final long serialVersionUID = 1L;
 
-    private static final StringData OP_INSERT = StringData.fromString("INSERT");
-    private static final StringData OP_DELETE = StringData.fromString("DELETE");
+    private static final String OP_INSERT = "INSERT";
+    private static final String OP_DELETE = "DELETE";
 
-    private transient GenericRowData reuse;
+    private transient Row reuse;
 
-    /** The serializer to serialize Canal JSON data. */
-    private final JsonRowDataSerializationSchema jsonSerializer;
+    private final JsonRowSerializationSchema jsonSerializer;
 
-    private final DataFormatConverters.RowConverter consumedRowConverter;
+    private final Map<Integer, ReadableMetadata> fieldIndexToMetadata;
 
-    private final DataFormatConverters.RowConverter physicalRowConverter;
+    private final boolean isMigrateAll;
 
-    private final Map<Integer, ReadableMetadata> fieldIndexToMetadata;
+    private final ObjectMapper objectMapper;
 
     public CanalJsonSerializationSchema(
             RowType physicalRowType,
             Map<Integer, ReadableMetadata> fieldIndexToMetadata,
-            DataFormatConverters.RowConverter consumedRowConverter,
-            DataFormatConverters.RowConverter physicalRowConverter,
-            TimestampFormat timestampFormat,
-            JsonOptions.MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral,
-            boolean encodeDecimalAsPlainNumber) {
-        jsonSerializer =
-                new JsonRowDataSerializationSchema(
-                        createJsonRowType(fromLogicalToDataType(physicalRowType), fieldIndexToMetadata.values()),
-                        timestampFormat,
-                        mapNullKeyMode,
-                        mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+            boolean isMigrateAll
+    ) {
+        this.isMigrateAll = isMigrateAll;
+
+        if (isMigrateAll) {
+            this.objectMapper = new ObjectMapper();
+        } else {
+            this.objectMapper = null;
+        }
+
+        RowTypeInfo rowTypeInfo = createJsonRowType(fromLogicalToDataType(physicalRowType),
+                fieldIndexToMetadata.values(), isMigrateAll);
+        jsonSerializer = JsonRowSerializationSchema.builder().withTypeInfo(rowTypeInfo).build();
 
         this.fieldIndexToMetadata = fieldIndexToMetadata;
-        this.consumedRowConverter = consumedRowConverter;
-        this.physicalRowConverter = physicalRowConverter;
     }
 
     @Override
     public void open(InitializationContext context) {
-        reuse = new GenericRowData(2 + fieldIndexToMetadata.size());
+        reuse = new Row(2 + fieldIndexToMetadata.size());
     }
 
     @Override
-    public byte[] serialize(RowData row) {
+    public byte[] serialize(Row row) {
         try {
             MysqlBinLogData mysqlBinLogData = getMysqlBinLongData(row);
 
-            ArrayData arrayData = new GenericArrayData(new RowData[] {mysqlBinLogData.getPhysicalData()});
+            Object[] arrayData = new Object[1];
+            if (isMigrateAll) {
+                String mapStr = mysqlBinLogData.getPhysicalData().getFieldAs(0);

Review comment:
       It saves manual casting from Object to String.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org