You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/08/17 06:33:14 UTC
[inlong] 01/02: [INLONG-5461][Sort] Add audit for MongoDB extract node (#5548)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 4de795b3303ca146a7c7a56fd30ba84992573440
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Wed Aug 17 10:54:28 2022 +0800
[INLONG-5461][Sort] Add audit for MongoDB extract node (#5548)
---
inlong-sort/sort-connectors/mongodb-cdc/pom.xml | 11 +++++
.../sort/cdc/mongodb/DebeziumSourceFunction.java | 22 +++++++---
.../inlong/sort/cdc/mongodb/MongoDBSource.java | 14 ++++--
.../sort/cdc/mongodb/table/MongoDBTableSource.java | 22 ++++++----
.../mongodb/table/MongoDBTableSourceFactory.java | 51 +++++++++++-----------
5 files changed, 78 insertions(+), 42 deletions(-)
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
index f4d3e6036..077595230 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
@@ -41,6 +41,11 @@
<artifactId>sort-connector-base</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
@@ -98,6 +103,12 @@
</filter>
</filters>
<relocations>
+ <relocation>
+ <pattern>org.apache.inlong.sort.base</pattern>
+ <shadedPattern>
+ org.apache.inlong.sort.cdc.mongodb.shaded.org.apache.inlong.sort.base
+ </shadedPattern>
+ </relocation>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index db41472ed..450990eb4 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.audit.AuditImp;
import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
@@ -68,7 +69,9 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -76,8 +79,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
/**
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -222,6 +227,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
private String inlongMetric;
+ private String inlongAudit;
+
private SourceMetricData metricData;
// ---------------------------------------------------------------------------------------
@@ -230,12 +237,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
- Validator validator, String inlongMetric) {
+ Validator validator, String inlongMetric, String inlongAudit) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.inlongMetric = inlongMetric;
+ this.inlongAudit = inlongAudit;
}
@Override
@@ -414,7 +422,12 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup);
+ AuditImp auditImp = null;
+ if (inlongAudit != null) {
+ AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
+ auditImp = AuditImp.getInstance();
+ }
+ metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
metricData.registerMetricsForNumRecordsIn();
metricData.registerMetricsForNumBytesIn();
metricData.registerMetricsForNumBytesInPerSecond();
@@ -458,9 +471,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
if (metricData != null) {
- metricData.getNumRecordsIn().inc(1L);
- metricData.getNumBytesIn()
- .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+ metricData.outputMetrics(1L,
+ record.value().toString().getBytes(StandardCharsets.UTF_8).length);
}
deserializer.deserialize(record, out);
}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
index ebe9bf4b5..dad8581d7 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
@@ -139,7 +139,8 @@ public class MongoDBSource {
private String errorsTolerance;
private Integer heartbeatIntervalMillis;
private DebeziumDeserializationSchema<T> deserializer;
- private String inLongMetric;
+ private String inlongMetric;
+ private String inlongAudit;
/** The comma-separated list of hostname and port pairs of mongodb servers. */
public Builder<T> hosts(String hosts) {
@@ -317,8 +318,13 @@ public class MongoDBSource {
return this;
}
- public Builder<T> inLongMetric(String inLongMetric) {
- this.inLongMetric = inLongMetric;
+ public Builder<T> inlongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
+ return this;
+ }
+
+ public Builder<T> inlongAudit(String inlongAudit) {
+ this.inlongAudit = inlongAudit;
return this;
}
@@ -441,7 +447,7 @@ public class MongoDBSource {
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME_DEFAULT);
return new DebeziumSourceFunction<>(
- deserializer, props, null, Validator.getDefaultValidator(), inLongMetric);
+ deserializer, props, null, Validator.getDefaultValidator(), inlongMetric, inlongAudit);
}
}
}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
index e037ef55a..612f8b48d 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -76,7 +76,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private final Integer heartbeatIntervalMillis;
private final ZoneId localTimeZone;
- private final String inLongMetric;
+ private final String inlongMetric;
+ private final String inlongAudit;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@@ -106,7 +107,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
@Nullable Integer pollAwaitTimeMillis,
@Nullable Integer heartbeatIntervalMillis,
ZoneId localTimeZone,
- String inLongMetric) {
+ String inlongMetric,
+ String inlongAudit) {
this.physicalSchema = physicalSchema;
this.hosts = checkNotNull(hosts);
this.username = username;
@@ -126,7 +128,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
this.localTimeZone = localTimeZone;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
+ this.inlongAudit = inlongAudit;
}
@Override
@@ -184,8 +187,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis);
- Optional.ofNullable(inLongMetric).ifPresent(builder::inLongMetric);
-
+ Optional.ofNullable(inlongMetric).ifPresent(builder::inlongMetric);
+ Optional.ofNullable(inlongAudit).ifPresent(builder::inlongAudit);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
return SourceFunctionProvider.of(sourceFunction, false);
@@ -243,7 +246,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
pollAwaitTimeMillis,
heartbeatIntervalMillis,
localTimeZone,
- inLongMetric);
+ inlongMetric,
+ inlongAudit);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@@ -277,7 +281,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
&& Objects.equals(localTimeZone, that.localTimeZone)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
- && Objects.equals(inLongMetric, that.inLongMetric);
+ && Objects.equals(inlongMetric, that.inlongMetric)
+ && Objects.equals(inlongAudit, that.inlongAudit);
}
@Override
@@ -302,7 +307,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
localTimeZone,
producedDataType,
metadataKeys,
- inLongMetric);
+ inlongMetric,
+ inlongAudit);
}
@Override
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index d7299cf19..d235bc860 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
import java.time.ZoneId;
import java.util.HashSet;
@@ -36,6 +37,8 @@ import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
/**
* Factory for creating configured instance of {@link MongoDBTableSource}.
@@ -46,12 +49,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
private static final String DOCUMENT_ID_FIELD = "_id";
- public static final ConfigOption<String> INLONG_METRIC =
- ConfigOptions.key("inlong.metric")
- .stringType()
- .defaultValue("")
- .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
private static final ConfigOption<String> HOSTS =
ConfigOptions.key("hosts")
.stringType()
@@ -199,35 +196,37 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
final ReadableConfig config = helper.getOptions();
- String hosts = config.get(HOSTS);
- String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);
+ final String hosts = config.get(HOSTS);
+ final String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);
- String username = config.getOptional(USERNAME).orElse(null);
- String password = config.getOptional(PASSWORD).orElse(null);
+ final String username = config.getOptional(USERNAME).orElse(null);
+ final String password = config.getOptional(PASSWORD).orElse(null);
- String database = config.getOptional(DATABASE).orElse(null);
- String collection = config.getOptional(COLLECTION).orElse(null);
+ final String database = config.getOptional(DATABASE).orElse(null);
+ final String collection = config.getOptional(COLLECTION).orElse(null);
- String errorsTolerance = config.get(ERRORS_TOLERANCE);
- Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
+ final String errorsTolerance = config.get(ERRORS_TOLERANCE);
+ final Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
- Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
- Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
+ final Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
+ final Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
- Integer heartbeatIntervalMillis =
+ final Integer heartbeatIntervalMillis =
config.getOptional(HEARTBEAT_INTERVAL_MILLIS).orElse(null);
- Boolean copyExisting = config.get(COPY_EXISTING);
- String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
- Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
- Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
+ final Boolean copyExisting = config.get(COPY_EXISTING);
+ final String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
+ final Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
+ final Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
- String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
- ZoneId localTimeZone =
+ final String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
+ final ZoneId localTimeZone =
TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId)
? ZoneId.systemDefault()
: ZoneId.of(zoneId);
- String inLongMetric = config.get(INLONG_METRIC);
+ final String inlongMetric = config.get(INLONG_METRIC);
+ final String inlongAudit = config.get(INLONG_AUDIT);
+ ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
@@ -251,7 +250,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
pollAwaitTimeMillis,
heartbeatIntervalMillis,
localTimeZone,
- inLongMetric);
+ inlongMetric,
+ inlongAudit);
}
private void checkPrimaryKey(UniqueConstraint pk, String message) {
@@ -290,6 +290,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(POLL_AWAIT_TIME_MILLIS);
options.add(HEARTBEAT_INTERVAL_MILLIS);
options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
return options;
}
}