You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "e-mhui (via GitHub)" <gi...@apache.org> on 2023/03/31 08:31:02 UTC

[GitHub] [inlong] e-mhui opened a new pull request, #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

e-mhui opened a new pull request, #7750:
URL: https://github.com/apache/inlong/pull/7750

   ### Prepare a Pull Request
   
   [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records
   
   - Fixes #7554 
   
   ### Motivation
   
   [gh-ost](https://github.com/github/gh-ost) is a triggerless online schema migration solution for MySQL.
   
   When we use the `gh-ost` tool, it generates multiple DDL statements. For example, when adding a column `c` to table `tb1` using the `gh-ost` tool, these DDL statements demonstrate how gh-ost works. 
   
   ```sql
   DROP TABLE IF EXISTS `menghuiyu`.`_tb1_gho`
   DROP TABLE IF EXISTS `menghuiyu`.`_tb1_del`
   DROP TABLE IF EXISTS `menghuiyu`.`_tb1_ghc`
   create /* gh-ost */ table `menghuiyu`.`_tb1_ghc` (\n\t\t\tid bigint auto_increment,\n\t\t\tlast_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,\n\t\t\thint varchar(64) charset ascii not null,\n\t\t\tvalue varchar(4096) charset ascii not null,\n\t\t\tprimary key(id),\n\t\t\tunique key hint_uidx(hint)\n\t\t) auto_increment=256
   create /* gh-ost */ table `menghuiyu`.`_tb1_gho` like `menghuiyu`.`tb1`
   alter /* gh-ost */ table `menghuiyu`.`_tb1_gho` add column c varchar(255)
   create /* gh-ost */ table `menghuiyu`.`_tb1_del` (\n\t\t\tid int auto_increment primary key\n\t\t) engine=InnoDB comment='ghost-cut-over-sentry'
   DROP TABLE IF EXISTS `menghuiyu`.`_tb1_del`
   rename /* gh-ost */ table `menghuiyu`.`tb1` to `menghuiyu`.`_tb1_del`
   rename /* gh-ost */ table `menghuiyu`.`_tb1_gho` to `menghuiyu`.`tb1`
   DROP TABLE IF EXISTS `menghuiyu`.`_tb1_ghc`
   DROP TABLE IF EXISTS `menghuiyu`.`_tb1_del`
   ```
   
   When MySQL CDC captures these DDL statements and synchronizes them to the sink, the sink cannot recognize the gh-ost tables (`_tb1_gho`, `_tb1_ghc`, `_tb1_del`) in these DDL statements. Therefore, we need to restore the gh-ost tables to the original table `tb1`.
   
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   Run `AllMigrateTest.java`
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes )
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1174493772


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(

Review Comment:
   ok sure, LGTM then 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "e-mhui (via GitHub)" <gi...@apache.org>.
e-mhui commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1155494458


##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java:
##########
@@ -767,6 +786,33 @@ private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws
         return (GenericRowData) physicalConverter.convert(before, beforeSchema);
     }
 
+    /**
+     * Extract ghost ddl record
+     *
+     * @param data
+     * @return

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "e-mhui (via GitHub)" <gi...@apache.org>.
e-mhui commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173553333


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(

Review Comment:
   When using gh-ost for multiple DDL operations on the same table, it outputs in order.
   
   <img width="1912" alt="image" src="https://user-images.githubusercontent.com/111486498/233599879-d9f44ad5-5b26-4e7c-bca6-8c4fa74a5f68.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1155438581


##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java:
##########
@@ -767,6 +786,33 @@ private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws
         return (GenericRowData) physicalConverter.convert(before, beforeSchema);
     }
 
+    /**
+     * Extract ghost ddl record
+     *
+     * @param data
+     * @return

Review Comment:
   no description here pls fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173258301


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java:
##########
@@ -226,6 +258,7 @@ public byte[] serialize(MySqlSplit split) throws IOException {
             writeBinlogPosition(binlogSplit.getEndingOffset(), out);
             writeFinishedSplitsInfo(binlogSplit.getFinishedSnapshotSplitInfos(), out);
             writeTableSchemas(binlogSplit.getTableSchemas(), out);
+            writeTableDdls(binlogSplit.getTableDdls(), out);
             out.writeInt(binlogSplit.getTotalFinishedSplitSize());

Review Comment:
   invoke `writeTableDdls` and `readTableDdls` order could cause  compatibility issues.
   Because readTableDdls will cause error when state restore for old flink job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap merged pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap merged PR #7750:
URL: https://github.com/apache/inlong/pull/7750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173380929


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(

Review Comment:
   I guess overwrite means the data is lost then ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173272895


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java:
##########
@@ -296,10 +329,12 @@ public MySqlSplit deserializeSplit(int version, byte[] serialized) throws IOExce
             Map<TableId, TableChange> tableChangeMap = readTableSchemas(version, in);
             int totalFinishedSplitSize = finishedSplitsInfo.size();
             boolean isSuspended = false;
+            Map<TableId, String> tableDdls = null;
             if (version >= 3) {
                 totalFinishedSplitSize = in.readInt();
                 if (version > 3) {
                     isSuspended = in.readBoolean();
+                    tableDdls = readTableDdls(version, in);

Review Comment:
   what if the job restore from the old job and doesn't have ddl field, will it throw EOFException()? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "e-mhui (via GitHub)" <gi...@apache.org>.
e-mhui commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173549371


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java:
##########
@@ -104,6 +104,38 @@ public static Map<TableId, TableChange> readTableSchemas(int version, DataInputD
         return tableSchemas;
     }
 
+    public static void writeTableDdls(
+            Map<TableId, String> tableDdls, DataOutputSerializer out) throws IOException {
+        final int size = tableDdls.size();
+        out.writeInt(size);
+        for (Map.Entry<TableId, String> entry : tableDdls.entrySet()) {
+            out.writeUTF(entry.getKey().toString());
+            out.writeUTF(entry.getValue());
+        }
+    }
+
+    public static Map<TableId, String> readTableDdls(int version, DataInputDeserializer in)
+            throws IOException {
+        Map<TableId, String> tableDdls = new HashMap<>();
+        final int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            TableId tableId = TableId.parse(in.readUTF());

Review Comment:
   Thanks, it has been fixed.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173269181


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(

Review Comment:
   IIUC, the tableDdl use a Map<tableId, String> to store ddl from gh-ost, what if two ddl added to the map and the first ddl will be overwrite? Is it acceptable ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1155431301


##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java:
##########
@@ -767,6 +786,33 @@ private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws
         return (GenericRowData) physicalConverter.convert(before, beforeSchema);
     }
 
+    /**
+     * Extract ghost ddl record
+     *
+     * @param data
+     * @return
+     * @throws Exception
+     */
+    private GenericRowData extractGhostRecord(GenericRowData data) throws Exception {
+        String ddl = ((Map<String, String>)data.getField(0)).get(DDL_FIELD_NAME);
+        if (this.ghostTableRegex.startsWith(CARET) && this.ghostTableRegex.endsWith(DOLLAR)) {
+            this.ghostTableRegex = this.ghostTableRegex.substring(1, this.ghostTableRegex.length() - 1);

Review Comment:
   why remove $ and ^ ? 
   BTW what if a regex only has ^ at the beginning and no $ at the end ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173443327


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java:
##########
@@ -104,6 +104,38 @@ public static Map<TableId, TableChange> readTableSchemas(int version, DataInputD
         return tableSchemas;
     }
 
+    public static void writeTableDdls(
+            Map<TableId, String> tableDdls, DataOutputSerializer out) throws IOException {
+        final int size = tableDdls.size();
+        out.writeInt(size);
+        for (Map.Entry<TableId, String> entry : tableDdls.entrySet()) {
+            out.writeUTF(entry.getKey().toString());
+            out.writeUTF(entry.getValue());
+        }
+    }
+
+    public static Map<TableId, String> readTableDdls(int version, DataInputDeserializer in)
+            throws IOException {
+        Map<TableId, String> tableDdls = new HashMap<>();
+        final int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            TableId tableId = TableId.parse(in.readUTF());

Review Comment:
   You can ref to `readReadPhaseMetric` method to add judge `in.available() > 0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "e-mhui (via GitHub)" <gi...@apache.org>.
e-mhui commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173288400


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(
+                        tableId,
+                        ddl.replace(GHOST_TAG, "")
+                                .replaceAll("\\s+", " ")
+                                .replaceAll(matchTableInSqlRegex, tableName));
+            }
+        }
+    }
+
+    /**
+     * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where `a` is the captured table
+     * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and the tableName will contain
+     * both the captured table and the gh-ost generated table.
+     * We should retrieve the alter statements generated by gh-ost from the state and update the `source.table`
+     * and `historyRecord` values of the element.
+     */
+    private void updateGhostDdlElement(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String tableNames = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);

Review Comment:
   done.



##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(
+                        tableId,
+                        ddl.replace(GHOST_TAG, "")
+                                .replaceAll("\\s+", " ")
+                                .replaceAll(matchTableInSqlRegex, tableName));
+            }
+        }
+    }
+
+    /**
+     * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where `a` is the captured table
+     * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and the tableName will contain
+     * both the captured table and the gh-ost generated table.
+     * We should retrieve the alter statements generated by gh-ost from the state and update the `source.table`
+     * and `historyRecord` values of the element.
+     */
+    private void updateGhostDdlElement(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String tableNames = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        for (String tableName : tableNames.split(",")) {
+            Pattern compile = Pattern.compile(ghostTableRegex);
+            Matcher matcher = compile.matcher(tableName);
+            if (matcher.find()) {
+                tableName = matcher.group(1);
+                String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+                TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "e-mhui (via GitHub)" <gi...@apache.org>.
e-mhui commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173427057


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(

Review Comment:
   The previous data has been sent downstream, so it is normal for it to be overwritten.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173266509


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(
+                        tableId,
+                        ddl.replace(GHOST_TAG, "")
+                                .replaceAll("\\s+", " ")
+                                .replaceAll(matchTableInSqlRegex, tableName));
+            }
+        }
+    }
+
+    /**
+     * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where `a` is the captured table
+     * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and the tableName will contain
+     * both the captured table and the gh-ost generated table.
+     * We should retrieve the alter statements generated by gh-ost from the state and update the `source.table`
+     * and `historyRecord` values of the element.
+     */
+    private void updateGhostDdlElement(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String tableNames = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        for (String tableName : tableNames.split(",")) {
+            Pattern compile = Pattern.compile(ghostTableRegex);
+            Matcher matcher = compile.matcher(tableName);
+            if (matcher.find()) {
+                tableName = matcher.group(1);
+                String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+                TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(

Review Comment:
   ditto



##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(
+                        tableId,
+                        ddl.replace(GHOST_TAG, "")
+                                .replaceAll("\\s+", " ")
+                                .replaceAll(matchTableInSqlRegex, tableName));
+            }
+        }
+    }
+
+    /**
+     * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where `a` is the captured table
+     * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and the tableName will contain
+     * both the captured table and the gh-ost generated table.
+     * We should retrieve the alter statements generated by gh-ost from the state and update the `source.table`
+     * and `historyRecord` values of the element.
+     */
+    private void updateGhostDdlElement(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String tableNames = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        for (String tableName : tableNames.split(",")) {
+            Pattern compile = Pattern.compile(ghostTableRegex);
+            Matcher matcher = compile.matcher(tableName);
+            if (matcher.find()) {
+                tableName = matcher.group(1);
+                String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);

Review Comment:
   ditto



##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(
+                        tableId,
+                        ddl.replace(GHOST_TAG, "")
+                                .replaceAll("\\s+", " ")
+                                .replaceAll(matchTableInSqlRegex, tableName));
+            }
+        }
+    }
+
+    /**
+     * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where `a` is the captured table
+     * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and the tableName will contain
+     * both the captured table and the gh-ost generated table.
+     * We should retrieve the alter statements generated by gh-ost from the state and update the `source.table`
+     * and `historyRecord` values of the element.
+     */
+    private void updateGhostDdlElement(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String tableNames = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);

Review Comment:
   the method can be imported 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "e-mhui (via GitHub)" <gi...@apache.org>.
e-mhui commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173280928


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(

Review Comment:
   The DDL generated by gh-ost should be ordered, and we will also output the DDL generated by gh-ost in order, so it is possible to overwrite the previous one.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173443327


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java:
##########
@@ -104,6 +104,38 @@ public static Map<TableId, TableChange> readTableSchemas(int version, DataInputD
         return tableSchemas;
     }
 
+    public static void writeTableDdls(
+            Map<TableId, String> tableDdls, DataOutputSerializer out) throws IOException {
+        final int size = tableDdls.size();
+        out.writeInt(size);
+        for (Map.Entry<TableId, String> entry : tableDdls.entrySet()) {
+            out.writeUTF(entry.getKey().toString());
+            out.writeUTF(entry.getValue());
+        }
+    }
+
+    public static Map<TableId, String> readTableDdls(int version, DataInputDeserializer in)
+            throws IOException {
+        Map<TableId, String> tableDdls = new HashMap<>();
+        final int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            TableId tableId = TableId.parse(in.readUTF());

Review Comment:
   You can ref to `readReadPhaseMetric` method to add `judge in.available() > 0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "e-mhui (via GitHub)" <gi...@apache.org>.
e-mhui commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173268238


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java:
##########
@@ -226,6 +258,7 @@ public byte[] serialize(MySqlSplit split) throws IOException {
             writeBinlogPosition(binlogSplit.getEndingOffset(), out);
             writeFinishedSplitsInfo(binlogSplit.getFinishedSnapshotSplitInfos(), out);
             writeTableSchemas(binlogSplit.getTableSchemas(), out);
+            writeTableDdls(binlogSplit.getTableDdls(), out);
             out.writeInt(binlogSplit.getTotalFinishedSplitSize());

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "e-mhui (via GitHub)" <gi...@apache.org>.
e-mhui commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173549084


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java:
##########
@@ -296,10 +329,12 @@ public MySqlSplit deserializeSplit(int version, byte[] serialized) throws IOExce
             Map<TableId, TableChange> tableChangeMap = readTableSchemas(version, in);
             int totalFinishedSplitSize = finishedSplitsInfo.size();
             boolean isSuspended = false;
+            Map<TableId, String> tableDdls = null;
             if (version >= 3) {
                 totalFinishedSplitSize = in.readInt();
                 if (version > 3) {
                     isSuspended = in.readBoolean();
+                    tableDdls = readTableDdls(version, in);

Review Comment:
   Thanks, it has been fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #7750: [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records

Posted by "e-mhui (via GitHub)" <gi...@apache.org>.
e-mhui commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1155442137


##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java:
##########
@@ -767,6 +786,33 @@ private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws
         return (GenericRowData) physicalConverter.convert(before, beforeSchema);
     }
 
+    /**
+     * Extract ghost ddl record
+     *
+     * @param data
+     * @return
+     * @throws Exception
+     */
+    private GenericRowData extractGhostRecord(GenericRowData data) throws Exception {
+        String ddl = ((Map<String, String>)data.getField(0)).get(DDL_FIELD_NAME);
+        if (this.ghostTableRegex.startsWith(CARET) && this.ghostTableRegex.endsWith(DOLLAR)) {
+            this.ghostTableRegex = this.ghostTableRegex.substring(1, this.ghostTableRegex.length() - 1);

Review Comment:
   1. We need to extract tables from SQL, but usually tables are in the middle of SQL, `^_(.*)_(gho|ghc|del|new|old)$` can only match `_tb1_gho`, but cannot match `alter menghuiyu_tb1_gho add column c varchar(255)`. Therefore, we need to remove `^,$`.
   2. Generally, the default regular expression is suitable for most cases.
   3. Do you have any good suggestions for getting tables with gh-ost?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org