You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "EMsnap (via GitHub)" <gi...@apache.org> on 2023/04/13 11:54:28 UTC

[GitHub] [inlong] EMsnap opened a new pull request, #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

EMsnap opened a new pull request, #7846:
URL: https://github.com/apache/inlong/pull/7846

   ### Prepare a Pull Request
   - Fixes #7661 
   
   ### Motivation
   - Fixes #7661 
   
   ### Modifications
   
   Support Ddl model in mysql connector when running in all migrate mode
   Add a Operation in canal Json and debezium json for the downstream to do ddl operation 
   
   ### Verifying this change
   
   Run CanalJsonSerializationTest 
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7846: [INLONG-7660][Sort] Support DDL model for MySQL connector when running in all migrate mode

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1170785078


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.utils;
+
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
+import static org.apache.inlong.sort.cdc.mysql.utils.OperationUtils.generateOperation;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.inlong.sort.cdc.base.util.RecordUtils;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils for generating metadata in mysql cdc.
+ */
+public class MetaDataUtils {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private static final Logger LOG = LoggerFactory.getLogger(MetaDataUtils.class);
+
+    /**
+     * get sql type from table schema, represents the jdbc data type
+     *
+     * @param tableSchema table schema
+     */
+    public static Map<String, Integer> getSqlType(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, Integer> sqlType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns().forEach(
+                column -> sqlType.put(column.name(), column.jdbcType()));
+        return sqlType;
+    }
+
+    public static String getMetaData(SourceRecord record, String tableNameKey) {
+        Struct messageStruct = (Struct) record.value();
+        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+        return sourceStruct.getString(tableNameKey);
+    }
+
+    public static String getOpType(SourceRecord record) {
+        String opType;
+        final Envelope.Operation op = Envelope.operationFor(record);
+        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+            opType = "INSERT";
+        } else if (op == Envelope.Operation.DELETE) {
+            opType = "DELETE";
+        } else {
+            opType = "UPDATE";
+        }
+        return opType;
+    }
+
+    public static String getCanalOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "DELETE";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "INSERT";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static String getDebeziumOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "d";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "c";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static List<String> getPkNames(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        return tableSchema.getTable().primaryKeyColumnNames();
+    }
+
+    public static Map<String, String> getMysqlType(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, String> mysqlType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns()
+                .forEach(
+                        column -> {
+                            mysqlType.put(
+                                    column.name(),
+                                    String.format(
+                                            "%s(%d)",
+                                            column.typeName(),
+                                            column.length()));
+                        });

Review Comment:
   > yes since type name is string and length is int
   
   @EMsnap text longtext and other type don't have length



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7846: [INLONG-7660][Sort] Support DDL model for MySQL connector when running in all migrate mode

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1170990593


##########
licenses/inlong-sort-connectors/LICENSE:
##########
@@ -874,6 +874,7 @@ The text of each license is the standard Apache 2.0 license.
   com.cedarsoftware:json-io:2.5.1 - Java JSON serialization (https://github.com/jdereg/json-io), (Apache License, Version 2.0)
   net.minidev:json-smart:2.3 - JSON Small and Fast Parser (https://github.com/netplex/json-smart-v2/tree/v2.3), (The Apache Software License, Version 2.0)
   com.github.jsqlparser:jsqlparser:2.1 - JSQLParser library (https://github.com/JSQLParser/JSqlParser), (The Apache Software License, Version 2.0;  GNU Library or Lesser General Public License (LGPL) V2.1)

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on pull request #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on PR #7846:
URL: https://github.com/apache/inlong/pull/7846#issuecomment-1506862430

   > @EMsnap, please a design document for this PR, and it's better to split it into multi-tasks.
   
   added, and noted that this pr is an one-task pr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7846: [INLONG-7660][Sort] Support DDL model for MySQL connector when running in all migrate mode

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1171091179


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.utils;
+
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
+import static org.apache.inlong.sort.cdc.mysql.utils.OperationUtils.generateOperation;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.inlong.sort.cdc.base.util.RecordUtils;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils for generating metadata in mysql cdc.
+ */
+public class MetaDataUtils {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private static final Logger LOG = LoggerFactory.getLogger(MetaDataUtils.class);
+
+    /**
+     * get sql type from table schema, represents the jdbc data type
+     *
+     * @param tableSchema table schema
+     */
+    public static Map<String, Integer> getSqlType(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, Integer> sqlType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns().forEach(
+                column -> sqlType.put(column.name(), column.jdbcType()));
+        return sqlType;
+    }
+
+    public static String getMetaData(SourceRecord record, String tableNameKey) {
+        Struct messageStruct = (Struct) record.value();
+        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+        return sourceStruct.getString(tableNameKey);
+    }
+
+    public static String getOpType(SourceRecord record) {
+        String opType;
+        final Envelope.Operation op = Envelope.operationFor(record);
+        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+            opType = "INSERT";
+        } else if (op == Envelope.Operation.DELETE) {
+            opType = "DELETE";
+        } else {
+            opType = "UPDATE";
+        }
+        return opType;
+    }
+
+    public static String getCanalOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "DELETE";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "INSERT";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static String getDebeziumOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "d";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "c";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static List<String> getPkNames(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        return tableSchema.getTable().primaryKeyColumnNames();
+    }
+
+    public static Map<String, String> getMysqlType(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, String> mysqlType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns()
+                .forEach(
+                        column -> {
+                            mysqlType.put(
+                                    column.name(),
+                                    String.format(
+                                            "%s(%d)",
+                                            column.typeName(),
+                                            column.length()));
+                        });

Review Comment:
   column.length returns int



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on pull request #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

Posted by "e-mhui (via GitHub)" <gi...@apache.org>.
e-mhui commented on PR #7846:
URL: https://github.com/apache/inlong/pull/7846#issuecomment-1507858830

   Can you add unit tests for parsing SQL statements?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on pull request #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on PR #7846:
URL: https://github.com/apache/inlong/pull/7846#issuecomment-1507888498

   > Can you add unit tests for parsing SQL statements?
   
   Sure, will do, thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7846: [INLONG-7660][Sort] Support DDL model for MySQL connector when running in all migrate mode

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1169763802


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.utils;
+
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
+import static org.apache.inlong.sort.cdc.mysql.utils.OperationUtils.generateOperation;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.inlong.sort.cdc.base.util.RecordUtils;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils for generating metadata in mysql cdc.
+ */
+public class MetaDataUtils {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private static final Logger LOG = LoggerFactory.getLogger(MetaDataUtils.class);
+
+    /**
+     * get sql type from table schema, represents the jdbc data type
+     *
+     * @param tableSchema table schema
+     */
+    public static Map<String, Integer> getSqlType(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, Integer> sqlType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns().forEach(
+                column -> sqlType.put(column.name(), column.jdbcType()));
+        return sqlType;
+    }
+
+    public static String getMetaData(SourceRecord record, String tableNameKey) {
+        Struct messageStruct = (Struct) record.value();
+        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+        return sourceStruct.getString(tableNameKey);
+    }
+
+    public static String getOpType(SourceRecord record) {
+        String opType;
+        final Envelope.Operation op = Envelope.operationFor(record);
+        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+            opType = "INSERT";
+        } else if (op == Envelope.Operation.DELETE) {
+            opType = "DELETE";
+        } else {
+            opType = "UPDATE";
+        }
+        return opType;
+    }
+
+    public static String getCanalOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "DELETE";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "INSERT";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static String getDebeziumOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "d";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "c";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static List<String> getPkNames(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        return tableSchema.getTable().primaryKeyColumnNames();
+    }
+
+    public static Map<String, String> getMysqlType(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, String> mysqlType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns()
+                .forEach(
+                        column -> {
+                            mysqlType.put(
+                                    column.name(),
+                                    String.format(
+                                            "%s(%d)",
+                                            column.typeName(),
+                                            column.length()));
+                        });

Review Comment:
   yes since type name is string and length is int 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1168135289


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.utils;
+
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
+import static org.apache.inlong.sort.cdc.mysql.utils.OperationUtils.generateOperation;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.inlong.sort.cdc.base.util.RecordUtils;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils for generating metadata in mysql cdc.
+ */
+public class MetaDataUtils {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private static final Logger LOG = LoggerFactory.getLogger(MetaDataUtils.class);
+
+    /**
+     * get sql type from table schema, represents the jdbc data type
+     *
+     * @param tableSchema table schema
+     */
+    public static Map<String, Integer> getSqlType(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, Integer> sqlType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns().forEach(
+                column -> sqlType.put(column.name(), column.jdbcType()));
+        return sqlType;
+    }
+
+    public static String getMetaData(SourceRecord record, String tableNameKey) {
+        Struct messageStruct = (Struct) record.value();
+        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+        return sourceStruct.getString(tableNameKey);
+    }
+
+    public static String getOpType(SourceRecord record) {
+        String opType;
+        final Envelope.Operation op = Envelope.operationFor(record);
+        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+            opType = "INSERT";
+        } else if (op == Envelope.Operation.DELETE) {
+            opType = "DELETE";
+        } else {
+            opType = "UPDATE";
+        }
+        return opType;
+    }
+
+    public static String getCanalOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "DELETE";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "INSERT";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static String getDebeziumOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "d";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "c";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static List<String> getPkNames(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        return tableSchema.getTable().primaryKeyColumnNames();
+    }
+
+    public static Map<String, String> getMysqlType(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, String> mysqlType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns()
+                .forEach(
+                        column -> {
+                            mysqlType.put(
+                                    column.name(),
+                                    String.format(
+                                            "%s(%d)",
+                                            column.typeName(),
+                                            column.length()));
+                        });

Review Comment:
   Do all column type have `column.length()` ?  Can format `%s(%d)` represent all situations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang commented on a diff in pull request #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

Posted by "dockerzhang (via GitHub)" <gi...@apache.org>.
dockerzhang commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1166207185


##########
inlong-common/pom.xml:
##########
@@ -84,6 +84,10 @@
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.jsqlparser</groupId>
+            <artifactId>jsqlparser</artifactId>
+        </dependency>

Review Comment:
   Please add the related license for the new dependency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

Posted by "yunqingmoswu (via GitHub)" <gi...@apache.org>.
yunqingmoswu commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1166212277


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.enums;
+
+/**
+ * Alter type for alter column operation
+ */
+public enum AlterType {
+
+    RENAME_COLUMN,
+    ADD_COLUMN,
+    DROP_COLUMN,
+    MODIFY_COLUMN,

Review Comment:
   MODIFY_COLUMN can be merged  with CHANGE_COLUMN?



##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Column.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.expressions;
+
+import java.util.List;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+@Data
+@Builder
+@JsonInclude(Include.NON_NULL)
+public class Column {

Review Comment:
   add some comments



##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/Operation.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = AlterOperation.class, name = "alterOperation"),
+        @JsonSubTypes.Type(value = CreateTableOperation.class, name = "createTableOperation"),
+        @JsonSubTypes.Type(value = DropTableOperation.class, name = "dropTableOperation"),
+        @JsonSubTypes.Type(value = TruncateTableOperation.class, name = "truncateTableOperation"),
+        @JsonSubTypes.Type(value = RenameTableOperation.class, name = "renameTableOperation")
+})
+@Data
+@NoArgsConstructor
+public abstract class Operation {

Review Comment:
   add some comments



##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/TruncateTableOperation.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("truncateTableOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class TruncateTableOperation extends Operation {

Review Comment:
   add some comments



##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/DropTableOperation.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("dropTableOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class DropTableOperation extends Operation {

Review Comment:
   add some comments



##########
inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/DebeziumJsonSerializationTest.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.expressions.Position;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebeziumJsonSerializationTest {

Review Comment:
   add some comments



##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/RenameTableOperation.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("renameTableOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class RenameTableOperation extends Operation {

Review Comment:
   add some comments



##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/operations/CreateTableOperation.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.operations;
+
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.enums.OperationType;
+import org.apache.inlong.sort.protocol.ddl.indexes.Index;
+
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("createTableOperation")
+@JsonInclude(Include.NON_NULL)
+@Data
+public class CreateTableOperation extends Operation {

Review Comment:
   add some comments



##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/Position.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.expressions;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+
+@JsonInclude(Include.NON_NULL)
+@Data
+public class Position {

Review Comment:
   add some comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang commented on pull request #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

Posted by "dockerzhang (via GitHub)" <gi...@apache.org>.
dockerzhang commented on PR #7846:
URL: https://github.com/apache/inlong/pull/7846#issuecomment-1506850992

   @EMsnap, please a design document for this PR, and it's better to split it into multi-tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1168392154


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.enums;
+
+/**
+ * Alter type for alter column operation
+ */
+public enum AlterType {
+
+    RENAME_COLUMN,
+    ADD_COLUMN,
+    DROP_COLUMN,
+    MODIFY_COLUMN,

Review Comment:
   we keep modify column for future use (may be)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #7846: [INLONG-7660][Sort] Support DDL model for MySQL connector when running in all migrate mode

Posted by "dockerzhang (via GitHub)" <gi...@apache.org>.
dockerzhang merged PR #7846:
URL: https://github.com/apache/inlong/pull/7846


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang commented on a diff in pull request #7846: [INLONG-7660][Sort] Support DDL model for MySQL connector when running in all migrate mode

Posted by "dockerzhang (via GitHub)" <gi...@apache.org>.
dockerzhang commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1170895984


##########
licenses/inlong-sort-connectors/LICENSE:
##########
@@ -874,6 +874,7 @@ The text of each license is the standard Apache 2.0 license.
   com.cedarsoftware:json-io:2.5.1 - Java JSON serialization (https://github.com/jdereg/json-io), (Apache License, Version 2.0)
   net.minidev:json-smart:2.3 - JSON Small and Fast Parser (https://github.com/netplex/json-smart-v2/tree/v2.3), (The Apache Software License, Version 2.0)
   com.github.jsqlparser:jsqlparser:2.1 - JSQLParser library (https://github.com/JSQLParser/JSqlParser), (The Apache Software License, Version 2.0;  GNU Library or Lesser General Public License (LGPL) V2.1)

Review Comment:
   Do we need to remove the old version from the license?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1168391628


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.enums;
+
+/**
+ * Alter type for alter column operation
+ */
+public enum AlterType {
+
+    RENAME_COLUMN,
+    ADD_COLUMN,
+    DROP_COLUMN,
+    MODIFY_COLUMN,

Review Comment:
   https://dev.mysql.com/doc/refman/8.0/en/alter-table.html
   here the column operation type are corresponding to the alter type in mysql doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] wangpeix commented on a diff in pull request #7846: [INLONG-7660][Sort] Support Ddl model in mysql connector when running in all migrate mode

Posted by "wangpeix (via GitHub)" <gi...@apache.org>.
wangpeix commented on code in PR #7846:
URL: https://github.com/apache/inlong/pull/7846#discussion_r1166226732


##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.ddl.enums;
+
+/**
+ * Alter type for alter column operation
+ */
+public enum AlterType {
+
+    RENAME_COLUMN,
+    ADD_COLUMN,
+    DROP_COLUMN,
+    MODIFY_COLUMN,

Review Comment:
   What is the difference between `RENAME_COLUMN` and `MODIFY_COLUMN`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org