You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/08 06:48:13 UTC
[inlong] branch master updated: [INLONG-7787][Sort] Fix cannot output drop statement (#7788)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e68baebb9 [INLONG-7787][Sort] Fix cannot output drop statement (#7788)
e68baebb9 is described below
commit e68baebb96eafe25a64deb7b4ed5511e7d10d366
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Sat Apr 8 14:48:07 2023 +0800
[INLONG-7787][Sort] Fix cannot output drop statement (#7788)
---
.../main/java/org/apache/inlong/sort/base/Constants.java | 2 ++
.../sort/cdc/mysql/source/reader/MySqlRecordEmitter.java | 14 ++++++++++++++
2 files changed, 16 insertions(+)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 702329e6c..99fc91610 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -136,6 +136,8 @@ public final class Constants {
public static final String DDL_FIELD_NAME = "ddl";
+ public static final String DDL_OP_DROP = "DROP";
+
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric.labels")
.stringType()
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 2d1eab0f8..a8b6eb830 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -23,6 +23,7 @@ import io.debezium.data.Envelope;
import io.debezium.document.Array;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecord.Fields;
import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange;
import org.apache.flink.api.connector.source.SourceOutput;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
+import static org.apache.inlong.sort.base.Constants.DDL_OP_DROP;
import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getFetchTimestamp;
import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getHistoryRecord;
@@ -116,6 +118,18 @@ public final class MySqlRecordEmitter<T>
}
}
+ // for drop table ddl, there's no table change events
+ if (tableChanges.isEmpty()) {
+ String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+ if (ddl.toUpperCase().startsWith(DDL_OP_DROP)) {
+ TableId tableId = RecordUtils.getTableId(element);
+ // if this table is one of the captured tables, output the ddl element
+ if (splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)) {
+ outputDdlElement(element, output, splitState, null);
+ }
+ }
+ }
+
} else if (isDataChangeRecord(element)) {
if (splitState.isBinlogSplitState()) {
BinlogOffset position = getBinlogPosition(element);