You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/19 04:47:40 UTC
[inlong] branch master updated: [INLONG-7862][Sort] MongoCDC supports disabling the Changelog Normalize operator (#7870)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9038d9e61 [INLONG-7862][Sort] MongoCDC supports disabling the Changelog Normalize operator (#7870)
9038d9e61 is described below
commit 9038d9e61d0fc801938165f41b5a463c8703f081
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Wed Apr 19 12:47:34 2023 +0800
[INLONG-7862][Sort] MongoCDC supports disabling the Changelog Normalize operator (#7870)
---
.../inlong/sort/cdc/mongodb/table/MongoDBTableSource.java | 10 +++++++---
.../sort/cdc/mongodb/table/MongoDBTableSourceFactory.java | 14 +++++++++++++-
2 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
index 1770884c6..66f1ec1a6 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -81,6 +81,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private final String inlongAudit;
private final String rowValidator;
private final boolean sourceMultipleEnable;
+ private final boolean changelogNormalizeEnabled;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@@ -113,7 +114,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
String inlongMetric,
String inlongAudit,
String rowFilter,
- Boolean sourceMultipleEnable) {
+ Boolean sourceMultipleEnable,
+ Boolean changelogNormalizeEnabled) {
this.physicalSchema = physicalSchema;
this.hosts = checkNotNull(hosts);
this.username = username;
@@ -137,11 +139,12 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
this.inlongAudit = inlongAudit;
this.rowValidator = rowFilter;
this.sourceMultipleEnable = sourceMultipleEnable;
+ this.changelogNormalizeEnabled = changelogNormalizeEnabled;
}
@Override
public ChangelogMode getChangelogMode() {
- if (this.sourceMultipleEnable) {
+ if (this.sourceMultipleEnable || !changelogNormalizeEnabled) {
return ChangelogMode.all();
} else {
return ChangelogMode.newBuilder()
@@ -286,7 +289,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
inlongMetric,
inlongAudit,
rowValidator,
- sourceMultipleEnable);
+ sourceMultipleEnable,
+ changelogNormalizeEnabled);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index 1808bc9f7..53674840a 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -73,6 +73,15 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
+ "\"+U\" represents UPDATE_AFTER.\n"
+ "\"-D\" represents DELETE.");
+ public static final ConfigOption<Boolean> CHANGELOG_NORMALIZE_ENABLED =
+ ConfigOptions.key("changelog.normalize.enabled")
+ .booleanType()
+ .defaultValue(Boolean.TRUE)
+ .withDescription("MongoDB's Change Stream lacks the -U message, "
+ + "so it needs to be converted to Flink UPSERT changelog using "
+ + "the Changelog Normalize operator. The default value is true. (For scenarios that do not "
+ + "require the -U message, this operator can be disabled.) \n");
+
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
@@ -121,6 +130,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
final String rowKindFiltered = config.get(ROW_KINDS_FILTERED).isEmpty()
? ROW_KINDS_FILTERED.defaultValue()
: config.get(ROW_KINDS_FILTERED);
+ Boolean changelogNormalizeEnabled = config.get(CHANGELOG_NORMALIZE_ENABLED);
return new MongoDBTableSource(
physicalSchema,
@@ -143,7 +153,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
inlongMetric,
inlongAudit,
rowKindFiltered,
- sourceMultipleEnable);
+ sourceMultipleEnable,
+ changelogNormalizeEnabled);
}
private void checkPrimaryKey(UniqueConstraint pk, String message) {
@@ -186,6 +197,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
options.add(CHUNK_META_GROUP_SIZE);
+ options.add(CHANGELOG_NORMALIZE_ENABLED);
return options;
}
}