You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "EMsnap (via GitHub)" <gi...@apache.org> on 2023/03/13 03:34:57 UTC

[GitHub] [inlong] EMsnap opened a new pull request, #7579: [Feature] Mysql CDC support output DDL model in all migrate #7553

EMsnap opened a new pull request, #7579:
URL: https://github.com/apache/inlong/pull/7579

   ### Prepare a Pull Request
   - Fixes #7553 
   
   ### Motivation
   
   [INLONG-7555][Sort] Mysql CDC support output DDL model in all migrate
   
   ### Modifications
   [INLONG-7555][Sort] Mysql CDC support output DDL model in all migrate
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7579: [INLONG-7553][Sort] Mysql CDC support output DDL in all migrate mode

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7579:
URL: https://github.com/apache/inlong/pull/7579#discussion_r1152703788


##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java:
##########
@@ -681,13 +711,22 @@ public void deserialize(SourceRecord record, Collector<RowData> out,
                     emit(record, before, tableSchema, out);
                 }
             }
-
             GenericRowData after = extractAfterRow(value, valueSchema);
             after.setRowKind(RowKind.UPDATE_AFTER);
             emit(record, after, tableSchema, out);
         }
     }
 
+    private void extractDdlRecord(SourceRecord record, Collector<RowData> out, TableChange tableSchema,
+            Struct value) throws Exception {
+
+        GenericRowData insert = (GenericRowData) physicalConverter.convert(
+                objectMapper.readTree(value.get(HISTORY_RECORD_FIELD).toString()).get(DDL_FIELD_NAME).asText(), null);

Review Comment:
   Maybe here add try `covert exception`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7579: [INLONG-7553][Sort] Mysql CDC support output DDL in all migrate mode

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7579:
URL: https://github.com/apache/inlong/pull/7579#discussion_r1152702512


##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java:
##########
@@ -582,31 +588,49 @@ private DeserializationRuntimeConverter getAllMigrationConverter() {
             @Override
             public Object convert(Object dbzObj, Schema schema) {
 
-                ConnectSchema connectSchema = (ConnectSchema) schema;
-                List<Field> fields = connectSchema.fields();
+                if (dbzObj instanceof Struct) {
+                    ConnectSchema connectSchema = (ConnectSchema) schema;
+                    List<Field> fields = connectSchema.fields();
 
-                Map<String, Object> data = new HashMap<>();
-                Struct struct = (Struct) dbzObj;
-
-                for (Field field : fields) {
-                    String fieldName = field.name();
-                    Object fieldValue = struct.getWithoutDefault(fieldName);
-                    Schema fieldSchema = schema.field(fieldName).schema();
-                    String schemaName = fieldSchema.name();
-                    if (schemaName != null) {
-                        fieldValue = getValueWithSchema(fieldValue, schemaName);
-                    }
-                    if (fieldValue instanceof ByteBuffer) {
-                        // binary data (blob or varbinary in mysql) are stored in bytebuffer
-                        // use utf-8 to decode as a string by default
-                        fieldValue = new String(((ByteBuffer) fieldValue).array());
+                    Map<String, Object> data = new HashMap<>();
+                    Struct struct = (Struct) dbzObj;
+
+                    for (Field field : fields) {
+                        String fieldName = field.name();
+                        Object fieldValue = struct.getWithoutDefault(fieldName);
+                        Schema fieldSchema = schema.field(fieldName).schema();
+                        String schemaName = fieldSchema.name();
+                        if (schemaName != null) {
+                            fieldValue = getValueWithSchema(fieldValue, schemaName);
+                        }
+                        if (fieldValue instanceof ByteBuffer) {
+                            // binary data (blob or varbinary in mysql) are stored in bytebuffer
+                            // use utf-8 to decode as a string by default
+                            fieldValue = new String(((ByteBuffer) fieldValue).array());
+                        }
+                        data.put(fieldName, fieldValue);
                     }
-                    data.put(fieldName, fieldValue);
+
+                    GenericRowData row = new GenericRowData(1);
+                    row.setField(0, data);
+                    return row;
                 }
 
+                return constructDdlRow(dbzObj);
+
+            }
+
+            private GenericRowData constructDdlRow(Object ddl) {
+                Map<String, Object> data = new HashMap<>();
                 GenericRowData row = new GenericRowData(1);
                 row.setField(0, data);
-
+                try {
+                    data.put(DDL_FIELD_NAME, ddl);
+                } catch (Exception e) {
+                    LOG.info("Failed to convert DDL to json string", e);
+                    throw new RuntimeException(e);
+                }

Review Comment:
   `try ` can't get `Failed to convert DDL to json string`, just put data to map not to convert data 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap merged pull request #7579: [INLONG-7553][Sort] Mysql CDC support output DDL in all migrate mode

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap merged PR #7579:
URL: https://github.com/apache/inlong/pull/7579


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on pull request #7579: [INLONG-7553][Sort] Mysql CDC support output DDL in all migrate mode

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on PR #7579:
URL: https://github.com/apache/inlong/pull/7579#issuecomment-1489745218

   check code style


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org