You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/15 03:21:50 UTC
[inlong] branch master updated: [INLONG-5352][Sort] Add audit report for Pulsar source (#5525)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ea8bede37 [INLONG-5352][Sort] Add audit report for Pulsar source (#5525)
ea8bede37 is described below
commit ea8bede37cd2c14c66d4d7d40c79f48b18c6f51b
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Mon Aug 15 11:21:46 2022 +0800
[INLONG-5352][Sort] Add audit report for Pulsar source (#5525)
---
inlong-sort/sort-connectors/pulsar/pom.xml | 5 ++
.../table/DynamicPulsarDeserializationSchema.java | 58 ++++++++++++++++------
.../pulsar/table/PulsarDynamicTableFactory.java | 10 ++--
.../pulsar/table/PulsarDynamicTableSource.java | 16 +++---
.../table/UpsertPulsarDynamicTableFactory.java | 4 +-
5 files changed, 67 insertions(+), 26 deletions(-)
diff --git a/inlong-sort/sort-connectors/pulsar/pom.xml b/inlong-sort/sort-connectors/pulsar/pom.xml
index 395dcd0a3..71c84e859 100644
--- a/inlong-sort/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-connectors/pulsar/pom.xml
@@ -59,6 +59,11 @@
<artifactId>sort-connector-base</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index ea0c995c8..38a97bf5c 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,6 +18,8 @@
package org.apache.inlong.sort.pulsar.table;
+import java.util.Arrays;
+import java.util.HashSet;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
@@ -30,6 +32,8 @@ import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
@@ -63,6 +67,13 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
private final boolean upsertMode;
private SourceMetricData sourceMetricData;
private String inlongMetric;
+ private String auditHostAndPorts;
+
+ private AuditImp auditImp;
+
+ private String inlongGroupId;
+
+ private String inlongStreamId;
DynamicPulsarDeserializationSchema(
int physicalArity,
@@ -74,7 +85,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean upsertMode,
- String inlongMetric) {
+ String inlongMetric, String auditHostAndPorts) {
if (upsertMode) {
Preconditions.checkArgument(
keyDeserialization != null && keyProjection.length > 0,
@@ -92,6 +103,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
this.producedTypeInfo = producedTypeInfo;
this.upsertMode = upsertMode;
this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
}
@Override
@@ -102,17 +114,22 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
valueDeserialization.open(context);
if (inlongMetric != null && !inlongMetric.isEmpty()) {
- String[] inLongMetricArray = inlongMetric.split(DELIMITER);
- String groupId = inLongMetricArray[0];
- String streamId = inLongMetricArray[1];
- String nodeId = inLongMetricArray[2];
- sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, context.getMetricGroup());
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ inlongGroupId = inlongMetricArray[0];
+ inlongStreamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
+ sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup());
sourceMetricData.registerMetricsForNumBytesIn();
sourceMetricData.registerMetricsForNumBytesInPerSecond();
sourceMetricData.registerMetricsForNumRecordsIn();
sourceMetricData.registerMetricsForNumRecordsInPerSecond();
}
+ if (auditHostAndPorts != null) {
+ AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+ auditImp = AuditImp.getInstance();
+ }
+
}
@Override
@@ -133,11 +150,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(message.getData(), collector);
- if (sourceMetricData != null) {
- sourceMetricData.getNumRecordsIn().inc(1L);
- sourceMetricData.getNumBytesIn()
- .inc(message.getData().length);
- }
+ outputMetrics(message);
return;
}
BufferingCollector keyCollector = new BufferingCollector();
@@ -156,16 +169,29 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
outputCollector.collect(null);
} else {
valueDeserialization.deserialize(message.getData(), outputCollector);
- if (sourceMetricData != null) {
- sourceMetricData.getNumRecordsIn().inc(1L);
- sourceMetricData.getNumBytesIn()
- .inc(message.getData().length);
- }
+ outputMetrics(message);
}
keyCollector.buffer.clear();
}
+ private void outputMetrics(Message<RowData> message) {
+ if (sourceMetricData != null) {
+ sourceMetricData.getNumRecordsIn().inc(1L);
+ sourceMetricData.getNumBytesIn()
+ .inc(message.getData().length);
+ }
+ if (auditImp != null) {
+ auditImp.add(
+ Constants.AUDIT_SORT_INPUT,
+ inlongGroupId,
+ inlongStreamId,
+ System.currentTimeMillis(),
+ 1,
+ message.getData().length);
+ }
+ }
+
@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index 321d3f830..e5587db48 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -77,6 +77,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.validateTableSourceOptions;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
/**
@@ -275,6 +276,8 @@ public class PulsarDynamicTableFactory implements
String inlongMetric = tableOptions.get(INLONG_METRIC);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+
return createPulsarTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
@@ -288,7 +291,7 @@ public class PulsarDynamicTableFactory implements
adminUrl,
properties,
startupOptions,
- inlongMetric);
+ inlongMetric, auditHostAndPorts);
}
@Override
@@ -327,6 +330,7 @@ public class PulsarDynamicTableFactory implements
options.add(SINK_PARALLELISM);
options.add(PROPERTIES);
options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
return options;
}
@@ -358,7 +362,7 @@ public class PulsarDynamicTableFactory implements
String adminUrl,
Properties properties,
PulsarTableOptions.StartupOptions startupOptions,
- String inLongMetric) {
+ String inLongMetric, String auditHostAndPorts) {
return new PulsarDynamicTableSource(
physicalDataType,
keyDecodingFormat,
@@ -373,6 +377,6 @@ public class PulsarDynamicTableFactory implements
properties,
startupOptions,
false,
- inLongMetric);
+ inLongMetric, auditHostAndPorts);
}
}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
index 6f75555a2..0d5075007 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -146,7 +146,9 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
/** Flag to determine source mode. In upsert mode, it will keep the tombstone message. **/
protected final boolean upsertMode;
- protected String inLongMetric;
+ protected String inlongMetric;
+
+ protected String auditHostAndPorts;
public PulsarDynamicTableSource(
DataType physicalDataType,
@@ -162,7 +164,8 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
Properties properties,
PulsarTableOptions.StartupOptions startupOptions,
boolean upsertMode,
- String inlongMetric) {
+ String inlongMetric,
+ String auditHostAndPorts) {
this.producedDataType = physicalDataType;
setTopicInfo(properties, topics, topicPattern);
@@ -189,8 +192,8 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
this.startupOptions = startupOptions;
this.upsertMode = upsertMode;
- this.inLongMetric = inlongMetric;
-
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
}
private void setTopicInfo(Properties properties, List<String> topics, String topicPattern) {
@@ -295,7 +298,8 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
metadataConverters,
producedTypeInfo,
upsertMode,
- inLongMetric);
+ inlongMetric,
+ auditHostAndPorts);
}
@Override
@@ -313,7 +317,7 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
adminUrl,
properties,
startupOptions,
- false, inLongMetric);
+ false, inlongMetric, auditHostAndPorts);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index bf539d703..d691a676f 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -67,6 +67,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createValueFormatProjection;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
/**
@@ -187,6 +188,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
List<String> topics = tableOptions.get(TOPIC);
String topicPattern = tableOptions.get(TOPIC_PATTERN);
String inlongMetric = tableOptions.get(INLONG_METRIC);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
return new PulsarDynamicTableSource(
schema.toPhysicalRowDataType(),
@@ -201,7 +203,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
adminUrl,
properties,
startupOptions,
- true, inlongMetric);
+ true, inlongMetric, auditHostAndPorts);
}
@Override