You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/15 03:21:50 UTC

[inlong] branch master updated: [INLONG-5352][Sort] Add audit report for Pulsar source (#5525)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ea8bede37 [INLONG-5352][Sort] Add audit report for Pulsar source (#5525)
ea8bede37 is described below

commit ea8bede37cd2c14c66d4d7d40c79f48b18c6f51b
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Mon Aug 15 11:21:46 2022 +0800

    [INLONG-5352][Sort] Add audit report for Pulsar source (#5525)
---
 inlong-sort/sort-connectors/pulsar/pom.xml         |  5 ++
 .../table/DynamicPulsarDeserializationSchema.java  | 58 ++++++++++++++++------
 .../pulsar/table/PulsarDynamicTableFactory.java    | 10 ++--
 .../pulsar/table/PulsarDynamicTableSource.java     | 16 +++---
 .../table/UpsertPulsarDynamicTableFactory.java     |  4 +-
 5 files changed, 67 insertions(+), 26 deletions(-)

diff --git a/inlong-sort/sort-connectors/pulsar/pom.xml b/inlong-sort/sort-connectors/pulsar/pom.xml
index 395dcd0a3..71c84e859 100644
--- a/inlong-sort/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-connectors/pulsar/pom.xml
@@ -59,6 +59,11 @@
             <artifactId>sort-connector-base</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index ea0c995c8..38a97bf5c 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,6 +18,8 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
@@ -30,6 +32,8 @@ import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
@@ -63,6 +67,13 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
     private final boolean upsertMode;
     private SourceMetricData sourceMetricData;
     private String inlongMetric;
+    private String auditHostAndPorts;
+
+    private AuditImp auditImp;
+
+    private String inlongGroupId;
+
+    private String inlongStreamId;
 
     DynamicPulsarDeserializationSchema(
             int physicalArity,
@@ -74,7 +85,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
             boolean upsertMode,
-            String inlongMetric) {
+            String inlongMetric, String auditHostAndPorts) {
         if (upsertMode) {
             Preconditions.checkArgument(
                     keyDeserialization != null && keyProjection.length > 0,
@@ -92,6 +103,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
         this.producedTypeInfo = producedTypeInfo;
         this.upsertMode = upsertMode;
         this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
     }
 
     @Override
@@ -102,17 +114,22 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
         valueDeserialization.open(context);
 
         if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inLongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inLongMetricArray[0];
-            String streamId = inLongMetricArray[1];
-            String nodeId = inLongMetricArray[2];
-            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, context.getMetricGroup());
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            inlongGroupId = inlongMetricArray[0];
+            inlongStreamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup());
             sourceMetricData.registerMetricsForNumBytesIn();
             sourceMetricData.registerMetricsForNumBytesInPerSecond();
             sourceMetricData.registerMetricsForNumRecordsIn();
             sourceMetricData.registerMetricsForNumRecordsInPerSecond();
         }
 
+        if (auditHostAndPorts != null) {
+            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+            auditImp = AuditImp.getInstance();
+        }
+
     }
 
     @Override
@@ -133,11 +150,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
         // also not for a cartesian product with the keys
         if (keyDeserialization == null && !hasMetadata) {
             valueDeserialization.deserialize(message.getData(), collector);
-            if (sourceMetricData != null) {
-                sourceMetricData.getNumRecordsIn().inc(1L);
-                sourceMetricData.getNumBytesIn()
-                        .inc(message.getData().length);
-            }
+            outputMetrics(message);
             return;
         }
         BufferingCollector keyCollector = new BufferingCollector();
@@ -156,16 +169,29 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             outputCollector.collect(null);
         } else {
             valueDeserialization.deserialize(message.getData(), outputCollector);
-            if (sourceMetricData != null) {
-                sourceMetricData.getNumRecordsIn().inc(1L);
-                sourceMetricData.getNumBytesIn()
-                        .inc(message.getData().length);
-            }
+            outputMetrics(message);
         }
 
         keyCollector.buffer.clear();
     }
 
+    private void outputMetrics(Message<RowData> message) {
+        if (sourceMetricData != null) {
+            sourceMetricData.getNumRecordsIn().inc(1L);
+            sourceMetricData.getNumBytesIn()
+                    .inc(message.getData().length);
+        }
+        if (auditImp != null) {
+            auditImp.add(
+                Constants.AUDIT_SORT_INPUT,
+                inlongGroupId,
+                inlongStreamId,
+                System.currentTimeMillis(),
+                1,
+                message.getData().length);
+        }
+    }
+
     @Override
     public TypeInformation<RowData> getProducedType() {
         return producedTypeInfo;
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index 321d3f830..e5587db48 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -77,6 +77,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
 import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.validateTableSourceOptions;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
 
 /**
@@ -275,6 +276,8 @@ public class PulsarDynamicTableFactory implements
 
         String inlongMetric = tableOptions.get(INLONG_METRIC);
 
+        String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+
         return createPulsarTableSource(
                 physicalDataType,
                 keyDecodingFormat.orElse(null),
@@ -288,7 +291,7 @@ public class PulsarDynamicTableFactory implements
                 adminUrl,
                 properties,
                 startupOptions,
-            inlongMetric);
+            inlongMetric, auditHostAndPorts);
     }
 
     @Override
@@ -327,6 +330,7 @@ public class PulsarDynamicTableFactory implements
         options.add(SINK_PARALLELISM);
         options.add(PROPERTIES);
         options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
 
         return options;
     }
@@ -358,7 +362,7 @@ public class PulsarDynamicTableFactory implements
             String adminUrl,
             Properties properties,
             PulsarTableOptions.StartupOptions startupOptions,
-        String inLongMetric) {
+        String inLongMetric, String auditHostAndPorts) {
         return new PulsarDynamicTableSource(
                 physicalDataType,
                 keyDecodingFormat,
@@ -373,6 +377,6 @@ public class PulsarDynamicTableFactory implements
                 properties,
                 startupOptions,
                 false,
-            inLongMetric);
+            inLongMetric, auditHostAndPorts);
     }
 }
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
index 6f75555a2..0d5075007 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -146,7 +146,9 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
     /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. **/
     protected final boolean upsertMode;
 
-    protected String inLongMetric;
+    protected String inlongMetric;
+
+    protected String auditHostAndPorts;
 
     public PulsarDynamicTableSource(
             DataType physicalDataType,
@@ -162,7 +164,8 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
             Properties properties,
             PulsarTableOptions.StartupOptions startupOptions,
             boolean upsertMode,
-            String inlongMetric) {
+            String inlongMetric,
+            String auditHostAndPorts) {
         this.producedDataType = physicalDataType;
         setTopicInfo(properties, topics, topicPattern);
 
@@ -189,8 +192,8 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
         this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
         this.startupOptions = startupOptions;
         this.upsertMode = upsertMode;
-        this.inLongMetric = inlongMetric;
-
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
     }
 
     private void setTopicInfo(Properties properties, List<String> topics, String topicPattern) {
@@ -295,7 +298,8 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
                 metadataConverters,
                 producedTypeInfo,
                 upsertMode,
-            inLongMetric);
+            inlongMetric,
+            auditHostAndPorts);
     }
 
     @Override
@@ -313,7 +317,7 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
                 adminUrl,
                 properties,
                 startupOptions,
-                false, inLongMetric);
+                false, inlongMetric, auditHostAndPorts);
         copy.producedDataType = producedDataType;
         copy.metadataKeys = metadataKeys;
         copy.watermarkStrategy = watermarkStrategy;
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index bf539d703..d691a676f 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -67,6 +67,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
 import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createValueFormatProjection;
 import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
 
 /**
@@ -187,6 +188,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
         List<String> topics = tableOptions.get(TOPIC);
         String topicPattern = tableOptions.get(TOPIC_PATTERN);
         String inlongMetric = tableOptions.get(INLONG_METRIC);
+        String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
 
         return new PulsarDynamicTableSource(
                 schema.toPhysicalRowDataType(),
@@ -201,7 +203,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
                 adminUrl,
                 properties,
                 startupOptions,
-                true, inlongMetric);
+                true, inlongMetric, auditHostAndPorts);
     }
 
     @Override