You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/21 08:16:00 UTC

[incubator-inlong] branch master updated: [INLONG-2256] SortStandalone support audit sdk. (#2261)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cb2229c  [INLONG-2256] SortStandalone support audit sdk. (#2261)
cb2229c is described below

commit cb2229c1c10f790d7b5faea8e7438d28334631f9
Author: 卢春亮 <94...@qq.com>
AuthorDate: Fri Jan 21 16:15:52 2022 +0800

    [INLONG-2256] SortStandalone support audit sdk. (#2261)
---
 inlong-sort-standalone/pom.xml                     |  14 +++
 .../sort/standalone/metrics/SortMetricItem.java    |  95 ++++++++++++++++-
 .../sort-standalone-source/pom.xml                 |  30 +-----
 .../sort/standalone/SortStandaloneApplication.java |   9 ++
 .../sort/standalone/metrics/audit/AuditUtils.java  | 116 +++++++++++++++++++++
 .../sort/standalone/sink/hive/HiveSinkContext.java |   6 +-
 .../sink/pulsar/PulsarProducerCluster.java         |   2 +
 .../source/sortsdk/SortSdkSourceContext.java       |   6 +-
 8 files changed, 246 insertions(+), 32 deletions(-)

diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml
index 6a797b4..84db940 100644
--- a/inlong-sort-standalone/pom.xml
+++ b/inlong-sort-standalone/pom.xml
@@ -56,6 +56,8 @@
         <fastjson.version>1.2.79</fastjson.version>
         <simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
         <log4j.version>2.17.1</log4j.version>
+        <mockito.version>2.23.0</mockito.version>
+        <jakarta.validation.version>2.0.2</jakarta.validation.version>
     </properties>
 
     <dependencies>
@@ -118,6 +120,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <artifactId>mockito-core</artifactId>
+            <groupId>org.mockito</groupId>
+            <scope>test</scope>
+            <version>${mockito.version}</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>${junit.version}</version>
@@ -225,6 +233,12 @@
             <artifactId>log4j-slf4j-impl</artifactId>
             <version>${log4j.version}</version>
         </dependency>
+        <dependency>
+            <groupId>jakarta.validation</groupId>
+            <artifactId>jakarta.validation-api</artifactId>
+            <scope>compile</scope>
+            <version>${jakarta.validation.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
index ae0cf9f..7a744d0 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
@@ -20,11 +20,14 @@ package org.apache.inlong.sort.standalone.metrics;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.lang.math.NumberUtils;
 import org.apache.flume.Event;
 import org.apache.inlong.commons.config.metrics.CountMetric;
 import org.apache.inlong.commons.config.metrics.Dimension;
 import org.apache.inlong.commons.config.metrics.MetricDomain;
 import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.msg.AttributeConstants;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.utils.Constants;
 
 /**
@@ -109,15 +112,101 @@ public class SortMetricItem extends MetricItem {
 
     /**
      * fillInlongId
-     * 
+     *
      * @param event
      * @param dimensions
      */
     public static void fillInlongId(Event event, Map<String, String> dimensions) {
         Map<String, String> headers = event.getHeaders();
-        String inlongGroupId = headers.getOrDefault(Constants.INLONG_GROUP_ID, "-");
-        String inlongStreamId = headers.getOrDefault(Constants.INLONG_STREAM_ID, "-");
+        String inlongGroupId = getInlongGroupId(headers);
+        String inlongStreamId = getInlongStreamId(headers);
         dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
         dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
     }
+
+    /**
+     * fillAuditFormatTime
+     * 
+     * @param event
+     * @param dimensions
+     */
+    public static void fillAuditFormatTime(Event event, Map<String, String> dimensions) {
+        long msgTime = getLogTime(event);
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+    }
+
+    /**
+     * getAuditFormatTime
+     * 
+     * @param  msgTime
+     * @return
+     */
+    public static long getAuditFormatTime(long msgTime) {
+        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        return auditFormatTime;
+    }
+
+    /**
+     * getInlongGroupId
+     * 
+     * @param  headers
+     * @return
+     */
+    public static String getInlongGroupId(Map<String, String> headers) {
+        String inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
+        if (inlongGroupId == null) {
+            inlongGroupId = headers.getOrDefault(Constants.TOPIC, "");
+        }
+        return inlongGroupId;
+    }
+
+    /**
+     * getInlongStreamId
+     * 
+     * @param  headers
+     * @return
+     */
+    public static String getInlongStreamId(Map<String, String> headers) {
+        String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
+        if (inlongStreamId == null) {
+            inlongStreamId = headers.getOrDefault(AttributeConstants.INTERFACE_ID, "");
+        }
+        return inlongStreamId;
+    }
+
+    /**
+     * getLogTime
+     * 
+     * @param  headers
+     * @return
+     */
+    public static long getLogTime(Map<String, String> headers) {
+        String strLogTime = headers.get(Constants.HEADER_KEY_MSG_TIME);
+        if (strLogTime == null) {
+            strLogTime = headers.get(AttributeConstants.DATA_TIME);
+        }
+        if (strLogTime == null) {
+            return System.currentTimeMillis();
+        }
+        long logTime = NumberUtils.toLong(strLogTime, 0);
+        if (logTime == 0) {
+            logTime = System.currentTimeMillis();
+        }
+        return logTime;
+    }
+
+    /**
+     * getLogTime
+     * 
+     * @param  event
+     * @return
+     */
+    public static long getLogTime(Event event) {
+        if (event != null) {
+            Map<String, String> headers = event.getHeaders();
+            return getLogTime(headers);
+        }
+        return System.currentTimeMillis();
+    }
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml b/inlong-sort-standalone/sort-standalone-source/pom.xml
index 000f85c..5d17ac8 100644
--- a/inlong-sort-standalone/sort-standalone-source/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-source/pom.xml
@@ -42,39 +42,15 @@
             <artifactId>sort-standalone-common</artifactId>
             <version>${project.version}</version>
         </dependency>
-
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>inlong-sort-sdk</artifactId>
             <version>${project.version}</version>
         </dependency>
-
         <dependency>
-            <groupId>jakarta.validation</groupId>
-            <artifactId>jakarta.validation-api</artifactId>
-            <version>2.0.2</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <artifactId>powermock-module-junit4</artifactId>
-            <groupId>org.powermock</groupId>
-            <scope>test</scope>
-            <version>2.0.2</version>
-        </dependency>
-
-        <dependency>
-            <artifactId>powermock-api-mockito2</artifactId>
-            <groupId>org.powermock</groupId>
-            <scope>test</scope>
-            <version>2.0.2</version>
-        </dependency>
-
-        <dependency>
-            <artifactId>mockito-core</artifactId>
-            <groupId>org.mockito</groupId>
-            <scope>test</scope>
-            <version>2.23.0</version>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-sdk</artifactId>
+            <version>${project.version}</version>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
index 48c0470..f5d0b3f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.standalone;
 import org.apache.flume.node.Application;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.metrics.MetricObserver;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
 
@@ -44,6 +45,14 @@ public class SortStandaloneApplication {
             cluster.start();
             // metrics
             MetricObserver.init(CommonPropertiesHolder.get());
+            AuditUtils.initAudit();
+            Runtime.getRuntime().addShutdownHook(new Thread("sortstandalone-shutdown-hook") {
+
+                @Override
+                public void run() {
+                    AuditUtils.sendReport();
+                }
+            });
             Thread.sleep(5000);
         } catch (Exception e) {
             LOG.error("A fatal error occurred while running. Exception follows.", e);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
new file mode 100644
index 0000000..3a032b2
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics.audit;
+
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flume.Event;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.util.AuditConfig;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+
+/**
+ * 
+ * AuditUtils
+ */
+public class AuditUtils {
+
+    public static final String AUDIT_KEY_FILE_PATH = "audit.filePath";
+    public static final String AUDIT_DEFAULT_FILE_PATH = "/data/inlong/audit/";
+    public static final String AUDIT_KEY_MAX_CACHE_ROWS = "audit.maxCacheRows";
+    public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
+    public static final String AUDIT_KEY_PROXYS = "audit.proxys";
+    public static final String AUDIT_KEY_IS_AUDIT = "audit.isAudit";
+
+    public static final int AUDIT_ID_READ_SUCCESS = 7;
+    public static final int AUDIT_ID_SEND_SUCCESS = 8;
+
+    private static boolean IS_AUDIT = true;
+
+    /**
+     * initAudit
+     */
+    public static void initAudit() {
+        // IS_AUDIT
+        IS_AUDIT = BooleanUtils.toBoolean(CommonPropertiesHolder.getString(AUDIT_KEY_IS_AUDIT));
+        if (IS_AUDIT) {
+            // AuditProxy
+            String strIpPorts = CommonPropertiesHolder.getString(AUDIT_KEY_PROXYS);
+            HashSet<String> proxys = new HashSet<>();
+            if (!StringUtils.isBlank(strIpPorts)) {
+                String[] ipPorts = strIpPorts.split("\\s+");
+                for (String ipPort : ipPorts) {
+                    proxys.add(ipPort);
+                }
+            }
+            AuditImp.getInstance().setAuditProxy(proxys);
+            // AuditConfig
+            String filePath = CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH,
+                    AUDIT_DEFAULT_FILE_PATH);
+            int maxCacheRow = NumberUtils.toInt(
+                    CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS),
+                    AUDIT_DEFAULT_MAX_CACHE_ROWS);
+            AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
+            AuditImp.getInstance().setAuditConfig(auditConfig);
+        }
+    }
+
+    /**
+     * add
+     * 
+     * @param auditID
+     * @param event
+     */
+    public static void add(int auditID, ProfileEvent event) {
+        if (IS_AUDIT && event != null) {
+            String inlongGroupId = event.getInlongGroupId();
+            String inlongStreamId = event.getInlongStreamId();
+            long logTime = event.getRawLogTime();
+            AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+        }
+    }
+
+    /**
+     * add
+     * 
+     * @param auditID
+     * @param event
+     */
+    public static void add(int auditID, Event event) {
+        if (IS_AUDIT && event != null) {
+            Map<String, String> headers = event.getHeaders();
+            String inlongGroupId = SortMetricItem.getInlongGroupId(headers);
+            String inlongStreamId = SortMetricItem.getInlongStreamId(headers);
+            long logTime = SortMetricItem.getLogTime(headers);
+            AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+        }
+    }
+
+    /**
+     * sendReport
+     */
+    public static void sendReport() {
+        AuditImp.getInstance().sendReport();
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
index 8bd706f..545de7a 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.java
@@ -39,6 +39,7 @@ import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
@@ -250,6 +251,9 @@ public class HiveSinkContext extends SinkContext {
         if (result) {
             metricItem.sendSuccessCount.addAndGet(count);
             metricItem.sendSuccessSize.addAndGet(size);
+            currentRecord.getEvents().forEach((event) -> {
+                AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, event);
+            });
             if (sendTime > 0) {
                 long currentTime = System.currentTimeMillis();
                 long sinkDuration = currentTime - sendTime;
@@ -272,7 +276,7 @@ public class HiveSinkContext extends SinkContext {
     /**
      * getHiveConnection
      * 
-     * @return Connection
+     * @return                        Connection
      * @throws SQLException
      * @throws ClassNotFoundException
      */
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 993e97f..8e52d03 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -32,6 +32,7 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -273,6 +274,7 @@ public class PulsarProducerCluster implements LifecycleAware {
         if (result) {
             metricItem.sendSuccessCount.incrementAndGet();
             metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
+            AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord);
             if (sendTime > 0) {
                 long currentTime = System.currentTimeMillis();
                 long sinkDuration = currentTime - sendTime;
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
index 1ec37bd..c857f5f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
@@ -22,6 +22,7 @@ import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.source.SourceContext;
 
 import javax.annotation.Nullable;
@@ -81,17 +82,19 @@ public final class SortSdkSourceContext extends SourceContext {
         final Map<String, String> dimensions = this.createSortSdkSourceDimensionMap(event, sortId, topic);
         final SortMetricItem metricItem = metricItemSet.findMetricItem(dimensions);
         final int msgSize = event != null ? event.getBody().length : -1;
-        this.reportToMetric(metricItem, fetchResult, msgSize);
+        this.reportToMetric(event, metricItem, fetchResult, msgSize);
     }
 
     /**
      * Selector of metric report flow.
      *
+     * @param event The fetched event. May be <b>null</b> when fetch failed occurs.
      * @param item MetricItem that report to.
      * @param fetchResult Result of fetching, SUCCESS or FAILURE.
      * @param size The fetch length. -1 means fetch failure.
      */
     private void reportToMetric(
+            @Nullable final ProfileEvent event,
             @NotNull final SortMetricItem item,
             final FetchResult fetchResult,
             final int size) {
@@ -99,6 +102,7 @@ public final class SortSdkSourceContext extends SourceContext {
         switch (fetchResult) {
             case SUCCESS:
                 reportToMetric(item.readSuccessCount, item.readSuccessSize, size);
+                AuditUtils.add(AuditUtils.AUDIT_ID_READ_SUCCESS, event);
                 break;
             case FAILURE:
                 reportToMetric(item.readFailCount, item.readFailSize, size);