You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/08 06:48:13 UTC

[inlong] branch master updated: [INLONG-7787][Sort] Fix cannot output drop statement (#7788)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e68baebb9 [INLONG-7787][Sort] Fix cannot output drop statement (#7788)
e68baebb9 is described below

commit e68baebb96eafe25a64deb7b4ed5511e7d10d366
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Sat Apr 8 14:48:07 2023 +0800

    [INLONG-7787][Sort] Fix cannot output drop statement (#7788)
---
 .../main/java/org/apache/inlong/sort/base/Constants.java   |  2 ++
 .../sort/cdc/mysql/source/reader/MySqlRecordEmitter.java   | 14 ++++++++++++++
 2 files changed, 16 insertions(+)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 702329e6c..99fc91610 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -136,6 +136,8 @@ public final class Constants {
 
     public static final String DDL_FIELD_NAME = "ddl";
 
+    public static final String DDL_OP_DROP = "DROP";
+
     public static final ConfigOption<String> INLONG_METRIC =
             ConfigOptions.key("inlong.metric.labels")
                     .stringType()
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 2d1eab0f8..a8b6eb830 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -23,6 +23,7 @@ import io.debezium.data.Envelope;
 import io.debezium.document.Array;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecord.Fields;
 import io.debezium.relational.history.TableChanges;
 import io.debezium.relational.history.TableChanges.TableChange;
 import org.apache.flink.api.connector.source.SourceOutput;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static org.apache.inlong.sort.base.Constants.DDL_OP_DROP;
 import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
 import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getFetchTimestamp;
 import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getHistoryRecord;
@@ -116,6 +118,18 @@ public final class MySqlRecordEmitter<T>
                 }
             }
 
+            // for drop table ddl, there's no table change events
+            if (tableChanges.isEmpty()) {
+                String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+                if (ddl.toUpperCase().startsWith(DDL_OP_DROP)) {
+                    TableId tableId = RecordUtils.getTableId(element);
+                    // if this table is one of the captured tables, output the ddl element
+                    if (splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)) {
+                        outputDdlElement(element, output, splitState, null);
+                    }
+                }
+            }
+
         } else if (isDataChangeRecord(element)) {
             if (splitState.isBinlogSplitState()) {
                 BinlogOffset position = getBinlogPosition(element);