You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "MonsterChenzhuo (via GitHub)" <gi...@apache.org> on 2023/08/24 03:35:22 UTC

[GitHub] [incubator-paimon] MonsterChenzhuo opened a new pull request, #1880: [flink]Optimization of Canal parsing logic

MonsterChenzhuo opened a new pull request, #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880

   <!-- Please specify the module before the PR name: [core] ... or [flink] ... -->
   
   ### Purpose
   1. Avoid duplicate code
   
   2. Use predefined constants
   
   3. Decomposition method
   <!-- What is the purpose of the change -->
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304218126


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");

Review Comment:
   It is used in ValidateFormat,
   ```java
   if (isDdl()) { // Call the isDdl() method first, and there will be a validity of the ddl field
   // Verify the existence of sql fields
   checkNotNull(root.get(FIELD_SQL), errorMessageTemplate, FIELD_SQL);
   } else {
   checkNotNull(root.get(FIELD_MYSQL_TYPE), errorMessageTemplate, FIELD_MYSQL_TYPE);
   checkNotNull(root.get(FIELD_PRIMARY_KEYS), errorMessageTemplate, FIELD_PRIMARY_KEYS);
   }
   
   private boolean isDdl() {
       JsonNode isDdlNode = root.get(FIELD_IS_DDL);
       if (isDdlNode == null) {
           throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");
       }
       return isDdlNode.asBoolean();
   }
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304222486


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -27,27 +27,42 @@
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
 
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data
+ * across databases. This parser extracts relevant information from the Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT, UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
 public class CanalRecordParser extends RecordParser {
 
+    private static final Charset CHARSET_ISO_8859_1 = StandardCharsets.ISO_8859_1;
+    private static final String IS_DDL_FIELD = "isDdl";
     private static final String FIELD_SQL = "sql";

Review Comment:
   It is used in ValidateFormat,
            if (isDdl()) { // Call the isDdl() method first, and there will be a validity of the ddl field
            // Verify the existence of sql fields
               checkNotNull(root.get(FIELD_SQL), errorMessageTemplate, FIELD_SQL);
           } else {
               checkNotNull(root.get(FIELD_MYSQL_TYPE), errorMessageTemplate, FIELD_MYSQL_TYPE);
               checkNotNull(root.get(FIELD_PRIMARY_KEYS), errorMessageTemplate, FIELD_PRIMARY_KEYS);
           }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] yuzelin commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304225632


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");

Review Comment:
   I mean, we can just check that this field is existed in `validateFormat `, and we don't check nullable here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on pull request #1880: [flink] Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#issuecomment-1691793423

   @yuzelin PTAL,thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304236017


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");
+        }
+        return isDdlNode.asBoolean();
     }
 
     private List<String> extractPrimaryKeys() {
-        List<String> primaryKeys = new ArrayList<>();
-        ArrayNode pkNames = (ArrayNode) root.get(FIELD_PRIMARY_KEYS);
-        pkNames.iterator().forEachRemaining(pk -> primaryKeys.add(toFieldName(pk.asText())));
-        return primaryKeys;
+        ArrayNode pkNames = JsonSerdeUtil.getNodeAs(root, FIELD_PRIMARY_KEYS, ArrayNode.class);
+        return StreamSupport.stream(pkNames.spliterator(), false)
+                .map(pk -> toFieldName(pk.asText()))
+                .collect(Collectors.toList());
     }
 
     private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
+        JsonNode schema = JsonSerdeUtil.getNodeAs(root, FIELD_MYSQL_TYPE, JsonNode.class);
         LinkedHashMap<String, String> fieldTypes = new LinkedHashMap<>();
 
-        JsonNode schema = root.get(FIELD_MYSQL_TYPE);
-        Iterator<String> iterator = schema.fieldNames();
-        while (iterator.hasNext()) {
-            String fieldName = iterator.next();
-            String fieldType = schema.get(fieldName).asText();
-            fieldTypes.put(toFieldName(fieldName), fieldType);
-        }
+        schema.fieldNames()
+                .forEachRemaining(
+                        fieldName -> {
+                            String fieldType = schema.get(fieldName).asText();
+                            fieldTypes.put(toFieldName(fieldName), fieldType);
+                        });
 
         return fieldTypes;
     }
 
-    private Map<String, String> extractRow(
+    /**
+     * Extracts data from a given JSON node and transforms it based on provided MySQL and Paimon
+     * field types.
+     *
+     * @param record The JSON node containing the data.
+     * @param mySqlFieldTypes A map of MySQL field types.
+     * @param paimonFieldTypes A map of Paimon field types.
+     * @return A map of extracted and transformed data.
+     */
+    private Map<String, String> extractRowFromJson(
             JsonNode record,
             Map<String, String> mySqlFieldTypes,
             LinkedHashMap<String, DataType> paimonFieldTypes) {
+
         Map<String, Object> jsonMap =
                 OBJECT_MAPPER.convertValue(record, new TypeReference<Map<String, Object>>() {});
         if (jsonMap == null) {
             return new HashMap<>();
         }
 
-        Map<String, String> resultMap = new HashMap<>();
-        for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
-            String fieldName = field.getKey();
-            String mySqlType = field.getValue();
-            Object objectValue = jsonMap.get(fieldName);
-            if (objectValue == null) {
-                continue;
-            }
-
-            String oldValue = objectValue.toString();
-            String newValue = oldValue;
-
-            if (MySqlTypeUtils.isSetType(MySqlTypeUtils.getShortType(mySqlType))) {
-                newValue = CanalFieldParser.convertSet(newValue, mySqlType);
-            } else if (MySqlTypeUtils.isEnumType(MySqlTypeUtils.getShortType(mySqlType))) {
-                newValue = CanalFieldParser.convertEnum(newValue, mySqlType);
-            } else if (MySqlTypeUtils.isGeoType(MySqlTypeUtils.getShortType(mySqlType))) {
-                try {
-                    byte[] wkb =
-                            CanalFieldParser.convertGeoType2WkbArray(
-                                    oldValue.getBytes(StandardCharsets.ISO_8859_1));
-                    newValue = MySqlTypeUtils.convertWkbArray(wkb);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(
-                            String.format("Failed to convert %s to geometry JSON.", oldValue), e);
-                }
-            }
-            resultMap.put(fieldName, newValue);
-        }
+        Map<String, String> resultMap =
+                mySqlFieldTypes.entrySet().stream()
+                        .filter(
+                                entry ->
+                                        jsonMap.containsKey(entry.getKey())
+                                                && jsonMap.get(entry.getKey()) != null)

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304048068


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -27,27 +27,42 @@
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
 
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data
+ * across databases. This parser extracts relevant information from the Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT, UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
 public class CanalRecordParser extends RecordParser {
 
+    private static final Charset CHARSET_ISO_8859_1 = StandardCharsets.ISO_8859_1;

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] yuzelin commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1303855185


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -27,27 +27,42 @@
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
 
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data
+ * across databases. This parser extracts relevant information from the Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT, UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
 public class CanalRecordParser extends RecordParser {
 
+    private static final Charset CHARSET_ISO_8859_1 = StandardCharsets.ISO_8859_1;

Review Comment:
   I think we doesn't need to define another constant for JDK constant.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -27,27 +27,42 @@
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
 
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data
+ * across databases. This parser extracts relevant information from the Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT, UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
 public class CanalRecordParser extends RecordParser {
 
+    private static final Charset CHARSET_ISO_8859_1 = StandardCharsets.ISO_8859_1;
+    private static final String IS_DDL_FIELD = "isDdl";
     private static final String FIELD_SQL = "sql";

Review Comment:
   I find that `FIELD_SQL` is checked but never used, please remove it.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -27,27 +27,42 @@
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
 
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data
+ * across databases. This parser extracts relevant information from the Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT, UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
 public class CanalRecordParser extends RecordParser {
 
+    private static final Charset CHARSET_ISO_8859_1 = StandardCharsets.ISO_8859_1;
+    private static final String IS_DDL_FIELD = "isDdl";

Review Comment:
   Use `FIELD_IS_DDL` to keep the name style.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -273,4 +251,99 @@ private Map<String, String> extractRow(
 
         return resultMap;
     }
+
+    private String transformValue(String oldValue, String mySqlType) {
+        String newValue = oldValue;
+        String shortType = MySqlTypeUtils.getShortType(mySqlType);
+
+        if (MySqlTypeUtils.isSetType(shortType)) {
+            newValue = CanalFieldParser.convertSet(newValue, mySqlType);
+        } else if (MySqlTypeUtils.isEnumType(shortType)) {
+            newValue = CanalFieldParser.convertEnum(newValue, mySqlType);
+        } else if (MySqlTypeUtils.isGeoType(shortType)) {
+            try {
+                byte[] wkb =
+                        CanalFieldParser.convertGeoType2WkbArray(
+                                oldValue.getBytes(CHARSET_ISO_8859_1));
+                newValue = MySqlTypeUtils.convertWkbArray(wkb);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        String.format("Failed to convert %s to geometry JSON.", oldValue), e);
+            }
+        }
+        return newValue;
+    }

Review Comment:
   Doesn't need to use `newValue`, just return value in each branch.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");
+        }
+        return isDdlNode.asBoolean();
     }
 
     private List<String> extractPrimaryKeys() {
-        List<String> primaryKeys = new ArrayList<>();
-        ArrayNode pkNames = (ArrayNode) root.get(FIELD_PRIMARY_KEYS);
-        pkNames.iterator().forEachRemaining(pk -> primaryKeys.add(toFieldName(pk.asText())));
-        return primaryKeys;
+        ArrayNode pkNames = JsonSerdeUtil.getNodeAs(root, FIELD_PRIMARY_KEYS, ArrayNode.class);
+        return StreamSupport.stream(pkNames.spliterator(), false)
+                .map(pk -> toFieldName(pk.asText()))
+                .collect(Collectors.toList());
     }
 
     private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
+        JsonNode schema = JsonSerdeUtil.getNodeAs(root, FIELD_MYSQL_TYPE, JsonNode.class);
         LinkedHashMap<String, String> fieldTypes = new LinkedHashMap<>();
 
-        JsonNode schema = root.get(FIELD_MYSQL_TYPE);
-        Iterator<String> iterator = schema.fieldNames();
-        while (iterator.hasNext()) {
-            String fieldName = iterator.next();
-            String fieldType = schema.get(fieldName).asText();
-            fieldTypes.put(toFieldName(fieldName), fieldType);
-        }
+        schema.fieldNames()
+                .forEachRemaining(
+                        fieldName -> {
+                            String fieldType = schema.get(fieldName).asText();
+                            fieldTypes.put(toFieldName(fieldName), fieldType);
+                        });
 
         return fieldTypes;
     }
 
-    private Map<String, String> extractRow(
+    /**
+     * Extracts data from a given JSON node and transforms it based on provided MySQL and Paimon
+     * field types.
+     *
+     * @param record The JSON node containing the data.
+     * @param mySqlFieldTypes A map of MySQL field types.
+     * @param paimonFieldTypes A map of Paimon field types.
+     * @return A map of extracted and transformed data.
+     */
+    private Map<String, String> extractRowFromJson(
             JsonNode record,
             Map<String, String> mySqlFieldTypes,
             LinkedHashMap<String, DataType> paimonFieldTypes) {
+

Review Comment:
   We should unify the code style, don't need to add a line after `{` in methods.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");

Review Comment:
   According to the [doc](https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json), this field must be there. I think we can add check in `validateFormat` and remove check here.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");
+        }
+        return isDdlNode.asBoolean();
     }
 
     private List<String> extractPrimaryKeys() {
-        List<String> primaryKeys = new ArrayList<>();
-        ArrayNode pkNames = (ArrayNode) root.get(FIELD_PRIMARY_KEYS);
-        pkNames.iterator().forEachRemaining(pk -> primaryKeys.add(toFieldName(pk.asText())));
-        return primaryKeys;
+        ArrayNode pkNames = JsonSerdeUtil.getNodeAs(root, FIELD_PRIMARY_KEYS, ArrayNode.class);
+        return StreamSupport.stream(pkNames.spliterator(), false)
+                .map(pk -> toFieldName(pk.asText()))
+                .collect(Collectors.toList());
     }
 
     private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
+        JsonNode schema = JsonSerdeUtil.getNodeAs(root, FIELD_MYSQL_TYPE, JsonNode.class);
         LinkedHashMap<String, String> fieldTypes = new LinkedHashMap<>();
 
-        JsonNode schema = root.get(FIELD_MYSQL_TYPE);
-        Iterator<String> iterator = schema.fieldNames();
-        while (iterator.hasNext()) {
-            String fieldName = iterator.next();
-            String fieldType = schema.get(fieldName).asText();
-            fieldTypes.put(toFieldName(fieldName), fieldType);
-        }
+        schema.fieldNames()
+                .forEachRemaining(
+                        fieldName -> {
+                            String fieldType = schema.get(fieldName).asText();
+                            fieldTypes.put(toFieldName(fieldName), fieldType);
+                        });
 
         return fieldTypes;
     }
 
-    private Map<String, String> extractRow(
+    /**
+     * Extracts data from a given JSON node and transforms it based on provided MySQL and Paimon
+     * field types.
+     *
+     * @param record The JSON node containing the data.
+     * @param mySqlFieldTypes A map of MySQL field types.
+     * @param paimonFieldTypes A map of Paimon field types.
+     * @return A map of extracted and transformed data.
+     */
+    private Map<String, String> extractRowFromJson(
             JsonNode record,
             Map<String, String> mySqlFieldTypes,
             LinkedHashMap<String, DataType> paimonFieldTypes) {
+
         Map<String, Object> jsonMap =
                 OBJECT_MAPPER.convertValue(record, new TypeReference<Map<String, Object>>() {});
         if (jsonMap == null) {
             return new HashMap<>();
         }
 
-        Map<String, String> resultMap = new HashMap<>();
-        for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
-            String fieldName = field.getKey();
-            String mySqlType = field.getValue();
-            Object objectValue = jsonMap.get(fieldName);
-            if (objectValue == null) {
-                continue;
-            }
-
-            String oldValue = objectValue.toString();
-            String newValue = oldValue;
-
-            if (MySqlTypeUtils.isSetType(MySqlTypeUtils.getShortType(mySqlType))) {
-                newValue = CanalFieldParser.convertSet(newValue, mySqlType);
-            } else if (MySqlTypeUtils.isEnumType(MySqlTypeUtils.getShortType(mySqlType))) {
-                newValue = CanalFieldParser.convertEnum(newValue, mySqlType);
-            } else if (MySqlTypeUtils.isGeoType(MySqlTypeUtils.getShortType(mySqlType))) {
-                try {
-                    byte[] wkb =
-                            CanalFieldParser.convertGeoType2WkbArray(
-                                    oldValue.getBytes(StandardCharsets.ISO_8859_1));
-                    newValue = MySqlTypeUtils.convertWkbArray(wkb);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(
-                            String.format("Failed to convert %s to geometry JSON.", oldValue), e);
-                }
-            }
-            resultMap.put(fieldName, newValue);
-        }
+        Map<String, String> resultMap =
+                mySqlFieldTypes.entrySet().stream()
+                        .filter(
+                                entry ->
+                                        jsonMap.containsKey(entry.getKey())
+                                                && jsonMap.get(entry.getKey()) != null)

Review Comment:
   Equals to `jsonMap.get(entry.getKey()) != null`.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");
+        }
+        return isDdlNode.asBoolean();
     }
 
     private List<String> extractPrimaryKeys() {
-        List<String> primaryKeys = new ArrayList<>();
-        ArrayNode pkNames = (ArrayNode) root.get(FIELD_PRIMARY_KEYS);
-        pkNames.iterator().forEachRemaining(pk -> primaryKeys.add(toFieldName(pk.asText())));
-        return primaryKeys;
+        ArrayNode pkNames = JsonSerdeUtil.getNodeAs(root, FIELD_PRIMARY_KEYS, ArrayNode.class);
+        return StreamSupport.stream(pkNames.spliterator(), false)
+                .map(pk -> toFieldName(pk.asText()))
+                .collect(Collectors.toList());
     }
 
     private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
+        JsonNode schema = JsonSerdeUtil.getNodeAs(root, FIELD_MYSQL_TYPE, JsonNode.class);

Review Comment:
   If we don't need to cast the return JsonNode, we don't have to use this method. It's OK to use `root.get()`. 



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -70,77 +85,42 @@ public CanalRecordParser(
 
     @Override
     public List<RichCdcMultiplexRecord> extractRecords() {
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+
         if (isDdl()) {
-            return Collections.emptyList();
+            return records;
         }
 
         List<String> primaryKeys = extractPrimaryKeys();
-
-        // extract field types
         LinkedHashMap<String, String> mySqlFieldTypes = extractFieldTypesFromMySqlType();
-        LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<>();
-        mySqlFieldTypes.forEach(
-                (name, type) -> paimonFieldTypes.put(name, MySqlTypeUtils.toDataType(type)));
+        LinkedHashMap<String, DataType> paimonFieldTypes =
+                convertToPaimonFieldTypes(mySqlFieldTypes);
 
-        // extract row kind and field values
-        List<RichCdcMultiplexRecord> records = new ArrayList<>();
         String type = extractString(FIELD_TYPE);
-        ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
+        ArrayNode data = JsonSerdeUtil.getNodeAs(root, FIELD_DATA, ArrayNode.class);
+
         switch (type) {
             case OP_UPDATE:
-                ArrayNode old =
-                        root.get(FIELD_OLD) instanceof NullNode
-                                ? null
-                                : (ArrayNode) root.get(FIELD_OLD);
-                for (int i = 0; i < data.size(); i++) {
-                    Map<String, String> after =
-                            extractRow(data.get(i), mySqlFieldTypes, paimonFieldTypes);
-                    if (old != null) {
-                        Map<String, String> before =
-                                extractRow(old.get(i), mySqlFieldTypes, paimonFieldTypes);
-                        // fields in "old" (before) means the fields are changed
-                        // fields not in "old" (before) means the fields are not changed,
-                        // so we just copy the not changed fields into before
-                        for (Map.Entry<String, String> entry : after.entrySet()) {
-                            if (!before.containsKey(entry.getKey())) {
-                                before.put(entry.getKey(), entry.getValue());
-                            }
-                        }
-                        before = caseSensitive ? before : keyCaseInsensitive(before);
-                        records.add(
-                                new RichCdcMultiplexRecord(
-                                        databaseName,
-                                        tableName,
-                                        paimonFieldTypes,
-                                        primaryKeys,
-                                        new CdcRecord(RowKind.DELETE, before)));
-                    }
-                    after = caseSensitive ? after : keyCaseInsensitive(after);
-                    records.add(
-                            new RichCdcMultiplexRecord(
-                                    databaseName,
-                                    tableName,
-                                    paimonFieldTypes,
-                                    primaryKeys,
-                                    new CdcRecord(RowKind.INSERT, after)));
-                }
+                handleUpdateOperation(

Review Comment:
   Let the `handleXXXOperation` return the records; then just return in each branch.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java:
##########
@@ -163,7 +164,7 @@ protected String extractString(String key) {
     }
 
     private void extractDatabaseAndTableNames() {
-        JsonNode tableNode = root.get(FIELD_TABLE);
+        JsonNode tableNode = JsonSerdeUtil.getNodeAs(root, FIELD_TABLE, JsonNode.class);

Review Comment:
   As commented above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304218126


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");

Review Comment:
   It is used in ValidateFormat,
   if (isDdl()) { // Call the isDdl() method first, and there will be a validity of the ddl field
   // Verify the existence of sql fields
   checkNotNull(root.get(FIELD_SQL), errorMessageTemplate, FIELD_SQL);
   } else {
   checkNotNull(root.get(FIELD_MYSQL_TYPE), errorMessageTemplate, FIELD_MYSQL_TYPE);
   checkNotNull(root.get(FIELD_PRIMARY_KEYS), errorMessageTemplate, FIELD_PRIMARY_KEYS);
   }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] yuzelin merged pull request #1880: [flink] Optimization of Canal parsing logic

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin merged PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink] Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304289713


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304048068


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -27,27 +27,42 @@
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
 
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data
+ * across databases. This parser extracts relevant information from the Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT, UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
 public class CanalRecordParser extends RecordParser {
 
+    private static final Charset CHARSET_ISO_8859_1 = StandardCharsets.ISO_8859_1;

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304222486


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -27,27 +27,42 @@
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
 
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data
+ * across databases. This parser extracts relevant information from the Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT, UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
 public class CanalRecordParser extends RecordParser {
 
+    private static final Charset CHARSET_ISO_8859_1 = StandardCharsets.ISO_8859_1;
+    private static final String IS_DDL_FIELD = "isDdl";
     private static final String FIELD_SQL = "sql";

Review Comment:
   ```java
   if (isDdl()) { 
   checkNotNull(root.get(FIELD_SQL), errorMessageTemplate, FIELD_SQL); //  FIELD_SQL is used here
   } else {
   checkNotNull(root.get(FIELD_MYSQL_TYPE), errorMessageTemplate, FIELD_MYSQL_TYPE);
   checkNotNull(root.get(FIELD_PRIMARY_KEYS), errorMessageTemplate, FIELD_PRIMARY_KEYS);
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304227471


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");
+        }
+        return isDdlNode.asBoolean();
     }
 
     private List<String> extractPrimaryKeys() {
-        List<String> primaryKeys = new ArrayList<>();
-        ArrayNode pkNames = (ArrayNode) root.get(FIELD_PRIMARY_KEYS);
-        pkNames.iterator().forEachRemaining(pk -> primaryKeys.add(toFieldName(pk.asText())));
-        return primaryKeys;
+        ArrayNode pkNames = JsonSerdeUtil.getNodeAs(root, FIELD_PRIMARY_KEYS, ArrayNode.class);
+        return StreamSupport.stream(pkNames.spliterator(), false)
+                .map(pk -> toFieldName(pk.asText()))
+                .collect(Collectors.toList());
     }
 
     private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
+        JsonNode schema = JsonSerdeUtil.getNodeAs(root, FIELD_MYSQL_TYPE, JsonNode.class);
         LinkedHashMap<String, String> fieldTypes = new LinkedHashMap<>();
 
-        JsonNode schema = root.get(FIELD_MYSQL_TYPE);
-        Iterator<String> iterator = schema.fieldNames();
-        while (iterator.hasNext()) {
-            String fieldName = iterator.next();
-            String fieldType = schema.get(fieldName).asText();
-            fieldTypes.put(toFieldName(fieldName), fieldType);
-        }
+        schema.fieldNames()
+                .forEachRemaining(
+                        fieldName -> {
+                            String fieldType = schema.get(fieldName).asText();
+                            fieldTypes.put(toFieldName(fieldName), fieldType);
+                        });
 
         return fieldTypes;
     }
 
-    private Map<String, String> extractRow(
+    /**
+     * Extracts data from a given JSON node and transforms it based on provided MySQL and Paimon
+     * field types.
+     *
+     * @param record The JSON node containing the data.
+     * @param mySqlFieldTypes A map of MySQL field types.
+     * @param paimonFieldTypes A map of Paimon field types.
+     * @return A map of extracted and transformed data.
+     */
+    private Map<String, String> extractRowFromJson(
             JsonNode record,
             Map<String, String> mySqlFieldTypes,
             LinkedHashMap<String, DataType> paimonFieldTypes) {
+

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304272431


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -70,77 +85,42 @@ public CanalRecordParser(
 
     @Override
     public List<RichCdcMultiplexRecord> extractRecords() {
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+
         if (isDdl()) {
-            return Collections.emptyList();
+            return records;
         }
 
         List<String> primaryKeys = extractPrimaryKeys();
-
-        // extract field types
         LinkedHashMap<String, String> mySqlFieldTypes = extractFieldTypesFromMySqlType();
-        LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<>();
-        mySqlFieldTypes.forEach(
-                (name, type) -> paimonFieldTypes.put(name, MySqlTypeUtils.toDataType(type)));
+        LinkedHashMap<String, DataType> paimonFieldTypes =
+                convertToPaimonFieldTypes(mySqlFieldTypes);
 
-        // extract row kind and field values
-        List<RichCdcMultiplexRecord> records = new ArrayList<>();
         String type = extractString(FIELD_TYPE);
-        ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
+        ArrayNode data = JsonSerdeUtil.getNodeAs(root, FIELD_DATA, ArrayNode.class);
+
         switch (type) {
             case OP_UPDATE:
-                ArrayNode old =
-                        root.get(FIELD_OLD) instanceof NullNode
-                                ? null
-                                : (ArrayNode) root.get(FIELD_OLD);
-                for (int i = 0; i < data.size(); i++) {
-                    Map<String, String> after =
-                            extractRow(data.get(i), mySqlFieldTypes, paimonFieldTypes);
-                    if (old != null) {
-                        Map<String, String> before =
-                                extractRow(old.get(i), mySqlFieldTypes, paimonFieldTypes);
-                        // fields in "old" (before) means the fields are changed
-                        // fields not in "old" (before) means the fields are not changed,
-                        // so we just copy the not changed fields into before
-                        for (Map.Entry<String, String> entry : after.entrySet()) {
-                            if (!before.containsKey(entry.getKey())) {
-                                before.put(entry.getKey(), entry.getValue());
-                            }
-                        }
-                        before = caseSensitive ? before : keyCaseInsensitive(before);
-                        records.add(
-                                new RichCdcMultiplexRecord(
-                                        databaseName,
-                                        tableName,
-                                        paimonFieldTypes,
-                                        primaryKeys,
-                                        new CdcRecord(RowKind.DELETE, before)));
-                    }
-                    after = caseSensitive ? after : keyCaseInsensitive(after);
-                    records.add(
-                            new RichCdcMultiplexRecord(
-                                    databaseName,
-                                    tableName,
-                                    paimonFieldTypes,
-                                    primaryKeys,
-                                    new CdcRecord(RowKind.INSERT, after)));
-                }
+                handleUpdateOperation(

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#issuecomment-1691043689

   @yuzelin  PTAL,thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304009057


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -27,27 +27,42 @@
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
 
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data
+ * across databases. This parser extracts relevant information from the Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT, UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
 public class CanalRecordParser extends RecordParser {
 
+    private static final Charset CHARSET_ISO_8859_1 = StandardCharsets.ISO_8859_1;

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304048580


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -27,27 +27,42 @@
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
 
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+/**
+ * The {@code CanalRecordParser} class is responsible for parsing records from the Canal-JSON
+ * format. Canal is a database binlog multi-platform consumer, which is used to synchronize data
+ * across databases. This parser extracts relevant information from the Canal-JSON format and
+ * transforms it into a list of {@link RichCdcMultiplexRecord} objects, which represent the changes
+ * captured in the database.
+ *
+ * <p>The class handles different types of database operations such as INSERT, UPDATE, and DELETE,
+ * and generates corresponding {@link RichCdcMultiplexRecord} objects for each operation.
+ *
+ * <p>Additionally, the parser supports schema extraction, which can be used to understand the
+ * structure of the incoming data and its corresponding field types.
+ */
 public class CanalRecordParser extends RecordParser {
 
+    private static final Charset CHARSET_ISO_8859_1 = StandardCharsets.ISO_8859_1;
+    private static final String IS_DDL_FIELD = "isDdl";

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304218126


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304241678


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -273,4 +251,99 @@ private Map<String, String> extractRow(
 
         return resultMap;
     }
+
+    private String transformValue(String oldValue, String mySqlType) {
+        String newValue = oldValue;
+        String shortType = MySqlTypeUtils.getShortType(mySqlType);
+
+        if (MySqlTypeUtils.isSetType(shortType)) {
+            newValue = CanalFieldParser.convertSet(newValue, mySqlType);
+        } else if (MySqlTypeUtils.isEnumType(shortType)) {
+            newValue = CanalFieldParser.convertEnum(newValue, mySqlType);
+        } else if (MySqlTypeUtils.isGeoType(shortType)) {
+            try {
+                byte[] wkb =
+                        CanalFieldParser.convertGeoType2WkbArray(
+                                oldValue.getBytes(CHARSET_ISO_8859_1));
+                newValue = MySqlTypeUtils.convertWkbArray(wkb);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        String.format("Failed to convert %s to geometry JSON.", oldValue), e);
+            }
+        }
+        return newValue;
+    }

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink]Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304232336


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");
+        }
+        return isDdlNode.asBoolean();
     }
 
     private List<String> extractPrimaryKeys() {
-        List<String> primaryKeys = new ArrayList<>();
-        ArrayNode pkNames = (ArrayNode) root.get(FIELD_PRIMARY_KEYS);
-        pkNames.iterator().forEachRemaining(pk -> primaryKeys.add(toFieldName(pk.asText())));
-        return primaryKeys;
+        ArrayNode pkNames = JsonSerdeUtil.getNodeAs(root, FIELD_PRIMARY_KEYS, ArrayNode.class);
+        return StreamSupport.stream(pkNames.spliterator(), false)
+                .map(pk -> toFieldName(pk.asText()))
+                .collect(Collectors.toList());
     }
 
     private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
+        JsonNode schema = JsonSerdeUtil.getNodeAs(root, FIELD_MYSQL_TYPE, JsonNode.class);

Review Comment:
   1.Direct Retrieval Method:
   ```java
   JsonNode tableNode = root.get(FIELD_TABLE);
   ```
   This is a straightforward method that directly retrieves the child node named FIELD_TABLE from the root node. If the child node doesn't exist, tableNode will be null.
   2.Generic Method:
   ```java
   public static <T extends JsonNode> T getNodeAs(
            JsonNode root, String fieldName, Class<T> clazz) {
        ...
   }
   ```
   This method is more complex and aims to ensure that the retrieved child node is not only present but also of the expected type. This is achieved through the following steps:
   
   Using Optional to handle potential null values.
   Using the map method to retrieve the child node from the root.
   Using the filter method to ensure the child node is of the expected type.
   Using the map method to cast the child node to the expected type.
   If the child node is absent or not of the expected type, an exception is thrown using orElseThrow.
   
   
   Differences:
   
   Simplicity: The direct retrieval method is simpler but doesn't perform type checks.
   Type Safety: The generic method offers type safety, ensuring the retrieved node is of the expected type.
   Exception Handling: The generic method throws an exception if the node is absent or not of the expected type, whereas the direct retrieval method returns null if the node is absent.
   
   In summary, the generic method offers stronger type safety and error handling but is also more complex. If you frequently perform such operations in your application and want to reduce potential errors, using the generic method might be a good choice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] MonsterChenzhuo commented on a diff in pull request #1880: [flink] Optimization of Canal parsing logic

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1880:
URL: https://github.com/apache/incubator-paimon/pull/1880#discussion_r1304232336


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java:
##########
@@ -199,69 +179,67 @@ protected String extractString(String key) {
     }
 
     private boolean isDdl() {
-        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+        JsonNode isDdlNode = root.get(IS_DDL_FIELD);
+        if (isDdlNode == null) {
+            throw new IllegalArgumentException("Expected 'isDdl' field in the JSON but not found.");
+        }
+        return isDdlNode.asBoolean();
     }
 
     private List<String> extractPrimaryKeys() {
-        List<String> primaryKeys = new ArrayList<>();
-        ArrayNode pkNames = (ArrayNode) root.get(FIELD_PRIMARY_KEYS);
-        pkNames.iterator().forEachRemaining(pk -> primaryKeys.add(toFieldName(pk.asText())));
-        return primaryKeys;
+        ArrayNode pkNames = JsonSerdeUtil.getNodeAs(root, FIELD_PRIMARY_KEYS, ArrayNode.class);
+        return StreamSupport.stream(pkNames.spliterator(), false)
+                .map(pk -> toFieldName(pk.asText()))
+                .collect(Collectors.toList());
     }
 
     private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
+        JsonNode schema = JsonSerdeUtil.getNodeAs(root, FIELD_MYSQL_TYPE, JsonNode.class);

Review Comment:
   done.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java:
##########
@@ -163,7 +164,7 @@ protected String extractString(String key) {
     }
 
     private void extractDatabaseAndTableNames() {
-        JsonNode tableNode = root.get(FIELD_TABLE);
+        JsonNode tableNode = JsonSerdeUtil.getNodeAs(root, FIELD_TABLE, JsonNode.class);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org