You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/19 04:47:40 UTC

[inlong] branch master updated: [INLONG-7862][Sort] MongoCDC supports disabling the Changelog Normalize operator (#7870)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9038d9e61 [INLONG-7862][Sort] MongoCDC supports disabling the Changelog Normalize operator (#7870)
9038d9e61 is described below

commit 9038d9e61d0fc801938165f41b5a463c8703f081
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Wed Apr 19 12:47:34 2023 +0800

    [INLONG-7862][Sort] MongoCDC supports disabling the Changelog Normalize operator (#7870)
---
 .../inlong/sort/cdc/mongodb/table/MongoDBTableSource.java  | 10 +++++++---
 .../sort/cdc/mongodb/table/MongoDBTableSourceFactory.java  | 14 +++++++++++++-
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
index 1770884c6..66f1ec1a6 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -81,6 +81,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
     private final String inlongAudit;
     private final String rowValidator;
     private final boolean sourceMultipleEnable;
+    private final boolean changelogNormalizeEnabled;
 
     // --------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -113,7 +114,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
             String inlongMetric,
             String inlongAudit,
             String rowFilter,
-            Boolean sourceMultipleEnable) {
+            Boolean sourceMultipleEnable,
+            Boolean changelogNormalizeEnabled) {
         this.physicalSchema = physicalSchema;
         this.hosts = checkNotNull(hosts);
         this.username = username;
@@ -137,11 +139,12 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
         this.inlongAudit = inlongAudit;
         this.rowValidator = rowFilter;
         this.sourceMultipleEnable = sourceMultipleEnable;
+        this.changelogNormalizeEnabled = changelogNormalizeEnabled;
     }
 
     @Override
     public ChangelogMode getChangelogMode() {
-        if (this.sourceMultipleEnable) {
+        if (this.sourceMultipleEnable || !changelogNormalizeEnabled) {
             return ChangelogMode.all();
         } else {
             return ChangelogMode.newBuilder()
@@ -286,7 +289,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
                         inlongMetric,
                         inlongAudit,
                         rowValidator,
-                        sourceMultipleEnable);
+                        sourceMultipleEnable,
+                        changelogNormalizeEnabled);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index 1808bc9f7..53674840a 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -73,6 +73,15 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
                             + "\"+U\" represents UPDATE_AFTER.\n"
                             + "\"-D\" represents DELETE.");
 
+    public static final ConfigOption<Boolean> CHANGELOG_NORMALIZE_ENABLED =
+            ConfigOptions.key("changelog.normalize.enabled")
+                    .booleanType()
+                    .defaultValue(Boolean.TRUE)
+                    .withDescription("MongoDB's Change Stream lacks the -U message, "
+                            + "so it needs to be converted to Flink UPSERT changelog using "
+                            + "the Changelog Normalize operator. The default value is true. (For scenarios that do not "
+                            + "require the -U message, this operator can be disabled.) \n");
+
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         final FactoryUtil.TableFactoryHelper helper =
@@ -121,6 +130,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
         final String rowKindFiltered = config.get(ROW_KINDS_FILTERED).isEmpty()
                 ? ROW_KINDS_FILTERED.defaultValue()
                 : config.get(ROW_KINDS_FILTERED);
+        Boolean changelogNormalizeEnabled = config.get(CHANGELOG_NORMALIZE_ENABLED);
 
         return new MongoDBTableSource(
                 physicalSchema,
@@ -143,7 +153,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
                 inlongMetric,
                 inlongAudit,
                 rowKindFiltered,
-                sourceMultipleEnable);
+                sourceMultipleEnable,
+                changelogNormalizeEnabled);
     }
 
     private void checkPrimaryKey(UniqueConstraint pk, String message) {
@@ -186,6 +197,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
         options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
         options.add(CHUNK_META_GROUP_SIZE);
+        options.add(CHANGELOG_NORMALIZE_ENABLED);
         return options;
     }
 }