You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/08/17 06:33:14 UTC

[inlong] 01/02: [INLONG-5461][Sort] Add audit for MongoDB extract node (#5548)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 4de795b3303ca146a7c7a56fd30ba84992573440
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Wed Aug 17 10:54:28 2022 +0800

    [INLONG-5461][Sort] Add audit for MongoDB extract node (#5548)
---
 inlong-sort/sort-connectors/mongodb-cdc/pom.xml    | 11 +++++
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   | 22 +++++++---
 .../inlong/sort/cdc/mongodb/MongoDBSource.java     | 14 ++++--
 .../sort/cdc/mongodb/table/MongoDBTableSource.java | 22 ++++++----
 .../mongodb/table/MongoDBTableSourceFactory.java   | 51 +++++++++++-----------
 5 files changed, 78 insertions(+), 42 deletions(-)

diff --git a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
index f4d3e6036..077595230 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
@@ -41,6 +41,11 @@
             <artifactId>sort-connector-base</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -98,6 +103,12 @@
                                 </filter>
                             </filters>
                             <relocations>
+                                <relocation>
+                                    <pattern>org.apache.inlong.sort.base</pattern>
+                                    <shadedPattern>
+                                        org.apache.inlong.sort.cdc.mongodb.shaded.org.apache.inlong.sort.base
+                                    </shadedPattern>
+                                </relocation>
                                 <relocation>
                                     <pattern>org.apache.kafka</pattern>
                                     <shadedPattern>
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index db41472ed..450990eb4 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -68,7 +69,9 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -76,8 +79,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -222,6 +227,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
 
     private String inlongMetric;
 
+    private String inlongAudit;
+
     private SourceMetricData metricData;
 
     // ---------------------------------------------------------------------------------------
@@ -230,12 +237,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
             DebeziumDeserializationSchema<T> deserializer,
             Properties properties,
             @Nullable DebeziumOffset specificOffset,
-            Validator validator, String inlongMetric) {
+            Validator validator, String inlongMetric, String inlongAudit) {
         this.deserializer = deserializer;
         this.properties = properties;
         this.specificOffset = specificOffset;
         this.validator = validator;
         this.inlongMetric = inlongMetric;
+        this.inlongAudit = inlongAudit;
     }
 
     @Override
@@ -414,7 +422,12 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup);
+            AuditImp auditImp = null;
+            if (inlongAudit != null) {
+                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
+                auditImp = AuditImp.getInstance();
+            }
+            metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
             metricData.registerMetricsForNumRecordsIn();
             metricData.registerMetricsForNumBytesIn();
             metricData.registerMetricsForNumBytesInPerSecond();
@@ -458,9 +471,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                 if (metricData != null) {
-                                    metricData.getNumRecordsIn().inc(1L);
-                                    metricData.getNumBytesIn()
-                                            .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    metricData.outputMetrics(1L,
+                                            record.value().toString().getBytes(StandardCharsets.UTF_8).length);
                                 }
                                 deserializer.deserialize(record, out);
                             }
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
index ebe9bf4b5..dad8581d7 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
@@ -139,7 +139,8 @@ public class MongoDBSource {
         private String errorsTolerance;
         private Integer heartbeatIntervalMillis;
         private DebeziumDeserializationSchema<T> deserializer;
-        private String inLongMetric;
+        private String inlongMetric;
+        private String inlongAudit;
 
         /** The comma-separated list of hostname and port pairs of mongodb servers. */
         public Builder<T> hosts(String hosts) {
@@ -317,8 +318,13 @@ public class MongoDBSource {
             return this;
         }
 
-        public Builder<T> inLongMetric(String inLongMetric) {
-            this.inLongMetric = inLongMetric;
+        public Builder<T> inlongMetric(String inlongMetric) {
+            this.inlongMetric = inlongMetric;
+            return this;
+        }
+
+        public Builder<T> inlongAudit(String inlongAudit) {
+            this.inlongAudit = inlongAudit;
             return this;
         }
 
@@ -441,7 +447,7 @@ public class MongoDBSource {
                     Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME_DEFAULT);
 
             return new DebeziumSourceFunction<>(
-                    deserializer, props, null, Validator.getDefaultValidator(), inLongMetric);
+                    deserializer, props, null, Validator.getDefaultValidator(), inlongMetric, inlongAudit);
         }
     }
 }
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
index e037ef55a..612f8b48d 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -76,7 +76,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
     private final Integer heartbeatIntervalMillis;
     private final ZoneId localTimeZone;
 
-    private final String inLongMetric;
+    private final String inlongMetric;
+    private final String inlongAudit;
 
     // --------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -106,7 +107,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
             @Nullable Integer pollAwaitTimeMillis,
             @Nullable Integer heartbeatIntervalMillis,
             ZoneId localTimeZone,
-            String inLongMetric) {
+            String inlongMetric,
+            String inlongAudit) {
         this.physicalSchema = physicalSchema;
         this.hosts = checkNotNull(hosts);
         this.username = username;
@@ -126,7 +128,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
         this.localTimeZone = localTimeZone;
         this.producedDataType = physicalSchema.toPhysicalRowDataType();
         this.metadataKeys = Collections.emptyList();
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
+        this.inlongAudit = inlongAudit;
     }
 
     @Override
@@ -184,8 +187,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
         Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
         Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
         Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis);
-        Optional.ofNullable(inLongMetric).ifPresent(builder::inLongMetric);
-
+        Optional.ofNullable(inlongMetric).ifPresent(builder::inlongMetric);
+        Optional.ofNullable(inlongAudit).ifPresent(builder::inlongAudit);
         DebeziumSourceFunction<RowData> sourceFunction = builder.build();
 
         return SourceFunctionProvider.of(sourceFunction, false);
@@ -243,7 +246,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
                         pollAwaitTimeMillis,
                         heartbeatIntervalMillis,
                         localTimeZone,
-                        inLongMetric);
+                        inlongMetric,
+                        inlongAudit);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
@@ -277,7 +281,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
                 && Objects.equals(localTimeZone, that.localTimeZone)
                 && Objects.equals(producedDataType, that.producedDataType)
                 && Objects.equals(metadataKeys, that.metadataKeys)
-                && Objects.equals(inLongMetric, that.inLongMetric);
+                && Objects.equals(inlongMetric, that.inlongMetric)
+                && Objects.equals(inlongAudit, that.inlongAudit);
     }
 
     @Override
@@ -302,7 +307,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
                 localTimeZone,
                 producedDataType,
                 metadataKeys,
-                inLongMetric);
+                inlongMetric,
+                inlongAudit);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index d7299cf19..d235bc860 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 
 import java.time.ZoneId;
 import java.util.HashSet;
@@ -36,6 +37,8 @@ import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE
 import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
 import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
 /**
  * Factory for creating configured instance of {@link MongoDBTableSource}.
@@ -46,12 +49,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
 
     private static final String DOCUMENT_ID_FIELD = "_id";
 
-    public static final ConfigOption<String> INLONG_METRIC =
-            ConfigOptions.key("inlong.metric")
-                    .stringType()
-                    .defaultValue("")
-                    .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
     private static final ConfigOption<String> HOSTS =
             ConfigOptions.key("hosts")
                     .stringType()
@@ -199,35 +196,37 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
 
         final ReadableConfig config = helper.getOptions();
 
-        String hosts = config.get(HOSTS);
-        String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);
+        final String hosts = config.get(HOSTS);
+        final String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);
 
-        String username = config.getOptional(USERNAME).orElse(null);
-        String password = config.getOptional(PASSWORD).orElse(null);
+        final String username = config.getOptional(USERNAME).orElse(null);
+        final String password = config.getOptional(PASSWORD).orElse(null);
 
-        String database = config.getOptional(DATABASE).orElse(null);
-        String collection = config.getOptional(COLLECTION).orElse(null);
+        final String database = config.getOptional(DATABASE).orElse(null);
+        final String collection = config.getOptional(COLLECTION).orElse(null);
 
-        String errorsTolerance = config.get(ERRORS_TOLERANCE);
-        Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
+        final String errorsTolerance = config.get(ERRORS_TOLERANCE);
+        final Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
 
-        Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
-        Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
+        final Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
+        final Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
 
-        Integer heartbeatIntervalMillis =
+        final Integer heartbeatIntervalMillis =
                 config.getOptional(HEARTBEAT_INTERVAL_MILLIS).orElse(null);
 
-        Boolean copyExisting = config.get(COPY_EXISTING);
-        String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
-        Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
-        Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
+        final Boolean copyExisting = config.get(COPY_EXISTING);
+        final String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
+        final Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
+        final Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
 
-        String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
-        ZoneId localTimeZone =
+        final String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
+        final ZoneId localTimeZone =
                 TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId)
                         ? ZoneId.systemDefault()
                         : ZoneId.of(zoneId);
-        String inLongMetric = config.get(INLONG_METRIC);
+        final String inlongMetric = config.get(INLONG_METRIC);
+        final String inlongAudit = config.get(INLONG_AUDIT);
+        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
         checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
@@ -251,7 +250,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
                 pollAwaitTimeMillis,
                 heartbeatIntervalMillis,
                 localTimeZone,
-                inLongMetric);
+                inlongMetric,
+                inlongAudit);
     }
 
     private void checkPrimaryKey(UniqueConstraint pk, String message) {
@@ -290,6 +290,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
         options.add(POLL_AWAIT_TIME_MILLIS);
         options.add(HEARTBEAT_INTERVAL_MILLIS);
         options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
         return options;
     }
 }