You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2024/03/13 08:53:31 UTC

(inlong) branch master updated: [INLONG-9809][Audit] SDK supports both singleton and non-singleton usage (#9812)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ea39a2765 [INLONG-9809][Audit] SDK supports both singleton and non-singleton usage (#9812)
2ea39a2765 is described below

commit 2ea39a276556eb4319c588ab02440b8c0de3609c
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Wed Mar 13 16:53:26 2024 +0800

    [INLONG-9809][Audit] SDK supports both singleton and non-singleton usage (#9812)
---
 .../org/apache/inlong/audit/AuditOperator.java     | 319 +--------------------
 .../{AuditOperator.java => AuditReporterImpl.java} |  44 +--
 .../DefaultISocketAddressListLoader.java           |   2 +-
 .../{ => loader}/DnsSocketAddressListLoader.java   |   2 +-
 .../{ => loader}/SocketAddressListLoader.java      |   2 +-
 5 files changed, 23 insertions(+), 346 deletions(-)

diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
index 0c6f573f9b..85d6201a94 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
@@ -17,332 +17,27 @@
 
 package org.apache.inlong.audit;
 
-import org.apache.inlong.audit.protocol.AuditApi;
-import org.apache.inlong.audit.send.SenderManager;
-import org.apache.inlong.audit.util.AuditConfig;
-import org.apache.inlong.audit.util.Config;
-import org.apache.inlong.audit.util.StatInfo;
-
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;
-
 /**
  * Audit operator, which is singleton.
+ * Creating objects through deserialization is not supported.
  */
-public class AuditOperator implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class);
-    private static final String FIELD_SEPARATORS = ":";
-    private static final String DEFAULT_AUDIT_TAG = "-1";
-    private static final long DEFAULT_AUDIT_VERSION = -1;
-    private static final int BATCH_NUM = 100;
-    private static final AuditOperator AUDIT_OPERATOR = new AuditOperator();
-    private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
-    private static final int PERIOD = 1000 * 60;
-    private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>();
-    private final HashMap<String, StatInfo> threadCountMap = new HashMap<>();
-    private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>();
-    private final List<String> deleteKeyList = new ArrayList<>();
-    private final Config config = new Config();
-    private int packageId = 1;
-    private int dataId = 0;
-    private boolean initialized = false;
-    private SenderManager manager;
-
-    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
-    private AuditConfig auditConfig = null;
-    private SocketAddressListLoader loader = null;
+public final class AuditOperator extends AuditReporterImpl {
 
     /**
      * Not support create from outer.
      */
     private AuditOperator() {
-
-    }
-
-    /**
-     * Get AuditOperator instance.
-     */
-    public static AuditOperator getInstance() {
-        return AUDIT_OPERATOR;
     }
 
-    /**
-     * init
-     */
-    private void init() {
-        if (initialized) {
-            return;
-        }
-        config.init();
-        timeoutExecutor.scheduleWithFixedDelay(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    loadIpPortList();
-                    send();
-                } catch (Exception e) {
-                    LOGGER.error(e.getMessage());
-                }
-            }
-
-        }, PERIOD, PERIOD, TimeUnit.MILLISECONDS);
-        if (auditConfig == null) {
-            auditConfig = new AuditConfig();
-        }
-        this.manager = new SenderManager(auditConfig);
-    }
+    private static class AuditOperatorHandler {
 
-    private void loadIpPortList() {
-        if (loader == null) {
-            return;
-        }
-        try {
-            List<String> ipPortList = loader.loadSocketAddressList();
-            if (ipPortList != null && ipPortList.size() > 0) {
-                HashSet<String> ipPortSet = new HashSet<>();
-                ipPortSet.addAll(ipPortList);
-                this.setAuditProxy(ipPortSet);
-            }
-        } catch (Exception e) {
-            LOGGER.error(e.getMessage());
-        }
+        private static final AuditOperator INSTANCE = new AuditOperator();
     }
 
     /**
-     * set loader
-     *
-     * @param loader the loader to set
-     */
-    public void setLoader(SocketAddressListLoader loader) {
-        this.loader = loader;
-    }
-
-    /**
-     * setLoaderClass
-     *
-     * @param loaderClassName
-     */
-    public void setLoaderClass(String loaderClassName) {
-        if (StringUtils.isEmpty(loaderClassName)) {
-            return;
-        }
-        try {
-            Class<?> loaderClass = ClassUtils.getClass(loaderClassName);
-            Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
-            if (loaderObject instanceof SocketAddressListLoader) {
-                SocketAddressListLoader loader = (SocketAddressListLoader) loaderObject;
-                this.loader = loader;
-                LOGGER.info("audit IpPortListLoader loaderClass:{}", loaderClassName);
-            }
-        } catch (Throwable t) {
-            LOGGER.error("Fail to init IpPortListLoader,loaderClass:{},error:{}",
-                    loaderClassName, t.getMessage(), t);
-        }
-    }
-
-    /**
-     * Set AuditProxy from the ip
-     */
-    public void setAuditProxy(HashSet<String> ipPortList) {
-        try {
-            GLOBAL_LOCK.lockInterruptibly();
-            if (!initialized) {
-                init();
-                initialized = true;
-            }
-            this.manager.setAuditProxy(ipPortList);
-        } catch (InterruptedException e) {
-            LOGGER.error(e.getMessage());
-        } finally {
-            GLOBAL_LOCK.unlock();
-        }
-    }
-
-    /**
-     * set audit config
-     */
-    public void setAuditConfig(AuditConfig config) {
-        auditConfig = config;
-        manager.setAuditConfig(config);
-    }
-
-    /**
-     * Add audit data
-     */
-    public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size) {
-        add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, logTime, count, size);
-    }
-
-    public void add(int auditID, String auditTag, String inlongGroupID, String inlongStreamID, Long logTime,
-            long count, long size) {
-        long delayTime = System.currentTimeMillis() - logTime;
-        add(auditID, auditTag, inlongGroupID, inlongStreamID, logTime, count, size,
-                delayTime * count, DEFAULT_AUDIT_VERSION);
-    }
-
-    public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size,
-            long delayTime) {
-        add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, logTime, count, size,
-                delayTime, DEFAULT_AUDIT_VERSION);
-    }
-
-    public void add(int auditID, String auditTag, String inlongGroupID, String inlongStreamID, Long logTime,
-            long count, long size, long delayTime, long auditVersion) {
-        StringJoiner keyJoiner = new StringJoiner(FIELD_SEPARATORS);
-        keyJoiner.add(String.valueOf(logTime / PERIOD));
-        keyJoiner.add(inlongGroupID);
-        keyJoiner.add(inlongStreamID);
-        keyJoiner.add(String.valueOf(auditID));
-        keyJoiner.add(auditTag);
-        keyJoiner.add(String.valueOf(auditVersion));
-        addByKey(keyJoiner.toString(), count, size, delayTime);
-    }
-
-    /**
-     * Add audit info by key.
-     */
-    private void addByKey(String key, long count, long size, long delayTime) {
-        if (countMap.get(key) == null) {
-            countMap.put(key, new StatInfo(0L, 0L, 0L));
-        }
-        countMap.get(key).count.addAndGet(count);
-        countMap.get(key).size.addAndGet(size);
-        countMap.get(key).delay.addAndGet(delayTime);
-    }
-
-    /**
-     * Send audit data
-     */
-    public synchronized void send() {
-        manager.clearBuffer();
-        resetStat();
-        // Retrieve statistics from the list of objects without statistics to be eliminated
-        for (Map.Entry<String, StatInfo> entry : this.deleteCountMap.entrySet()) {
-            this.sumThreadGroup(entry.getKey(), entry.getValue());
-        }
-        this.deleteCountMap.clear();
-        for (Map.Entry<String, StatInfo> entry : countMap.entrySet()) {
-            String key = entry.getKey();
-            StatInfo value = entry.getValue();
-            // If there is no data, enter the list to be eliminated
-            if (value.count.get() == 0) {
-                this.deleteKeyList.add(key);
-                continue;
-            }
-            this.sumThreadGroup(key, value);
-        }
-
-        // Clean up obsolete statistical data objects
-        for (String key : this.deleteKeyList) {
-            StatInfo value = this.countMap.remove(key);
-            this.deleteCountMap.put(key, value);
-        }
-        this.deleteKeyList.clear();
-
-        long sdkTime = Calendar.getInstance().getTimeInMillis();
-        AuditApi.AuditMessageHeader msgHeader = AuditApi.AuditMessageHeader.newBuilder()
-                .setIp(config.getLocalIP()).setDockerId(config.getDockerId())
-                .setThreadId(String.valueOf(Thread.currentThread().getId()))
-                .setSdkTs(sdkTime).setPacketId(packageId)
-                .build();
-        AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder();
-        requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());
-
-        // process the stat info for all threads
-        for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) {
-            // Entry key order: logTime inlongGroupID inlongStreamID auditID auditTag auditVersion
-            String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
-            long logTime = Long.parseLong(keyArray[0]) * PERIOD;
-            String inlongGroupID = keyArray[1];
-            String inlongStreamID = keyArray[2];
-            String auditID = keyArray[3];
-            String auditTag = keyArray[4];
-            long auditVersion = Long.parseLong(keyArray[5]);
-            StatInfo value = entry.getValue();
-            AuditApi.AuditMessageBody msgBody = AuditApi.AuditMessageBody.newBuilder()
-                    .setLogTs(logTime)
-                    .setInlongGroupId(inlongGroupID)
-                    .setInlongStreamId(inlongStreamID)
-                    .setAuditId(auditID)
-                    .setAuditTag(auditTag)
-                    .setCount(value.count.get())
-                    .setSize(value.size.get())
-                    .setDelay(value.delay.get())
-                    .setAuditVersion(auditVersion)
-                    .build();
-            requestBuild.addMsgBody(msgBody);
-
-            if (dataId++ >= BATCH_NUM) {
-                dataId = 0;
-                packageId++;
-                sendByBaseCommand(requestBuild.build());
-                requestBuild.clearMsgBody();
-            }
-        }
-        if (requestBuild.getMsgBodyCount() > 0) {
-            sendByBaseCommand(requestBuild.build());
-            requestBuild.clearMsgBody();
-        }
-        threadCountMap.clear();
-
-        LOGGER.info("finish report audit data");
-    }
-
-    /**
-     * Send base command
-     */
-    private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
-        AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder();
-        baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
-        manager.send(baseCommand.build(), auditRequest);
-    }
-
-    /**
-     * Summary
-     */
-    private void sumThreadGroup(String key, StatInfo statInfo) {
-        long count = statInfo.count.getAndSet(0);
-        if (0 == count) {
-            return;
-        }
-        if (threadCountMap.get(key) == null) {
-            threadCountMap.put(key, new StatInfo(0, 0, 0));
-        }
-
-        long size = statInfo.size.getAndSet(0);
-        long delay = statInfo.delay.getAndSet(0);
-
-        threadCountMap.get(key).count.addAndGet(count);
-        threadCountMap.get(key).size.addAndGet(size);
-        threadCountMap.get(key).delay.addAndGet(delay);
-    }
-
-    /**
-     * Reset statistics
+     * Get AuditOperator instance.
      */
-    private void resetStat() {
-        dataId = 0;
-        packageId = 1;
+    public static AuditOperator getInstance() {
+        return AuditOperatorHandler.INSTANCE;
     }
 }
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
similarity index 92%
copy from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
copy to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index 0c6f573f9b..10c3913dd2 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.audit;
 
+import org.apache.inlong.audit.loader.SocketAddressListLoader;
 import org.apache.inlong.audit.protocol.AuditApi;
 import org.apache.inlong.audit.send.SenderManager;
 import org.apache.inlong.audit.util.AuditConfig;
@@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -44,22 +44,18 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;
 
-/**
- * Audit operator, which is singleton.
- */
-public class AuditOperator implements Serializable {
+public class AuditReporterImpl implements Serializable {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(AuditReporterImpl.class);
     private static final String FIELD_SEPARATORS = ":";
     private static final String DEFAULT_AUDIT_TAG = "-1";
     private static final long DEFAULT_AUDIT_VERSION = -1;
     private static final int BATCH_NUM = 100;
-    private static final AuditOperator AUDIT_OPERATOR = new AuditOperator();
-    private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
+    private final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
     private static final int PERIOD = 1000 * 60;
     private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>();
-    private final HashMap<String, StatInfo> threadCountMap = new HashMap<>();
+    private final ConcurrentHashMap<String, StatInfo> threadCountMap = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>();
     private final List<String> deleteKeyList = new ArrayList<>();
     private final Config config = new Config();
@@ -73,21 +69,7 @@ public class AuditOperator implements Serializable {
     private SocketAddressListLoader loader = null;
 
     /**
-     * Not support create from outer.
-     */
-    private AuditOperator() {
-
-    }
-
-    /**
-     * Get AuditOperator instance.
-     */
-    public static AuditOperator getInstance() {
-        return AUDIT_OPERATOR;
-    }
-
-    /**
-     * init
+     * Init
      */
     private void init() {
         if (initialized) {
@@ -130,7 +112,7 @@ public class AuditOperator implements Serializable {
     }
 
     /**
-     * set loader
+     * Set loader
      *
      * @param loader the loader to set
      */
@@ -139,7 +121,7 @@ public class AuditOperator implements Serializable {
     }
 
     /**
-     * setLoaderClass
+     * SetLoaderClass
      *
      * @param loaderClassName
      */
@@ -153,10 +135,10 @@ public class AuditOperator implements Serializable {
             if (loaderObject instanceof SocketAddressListLoader) {
                 SocketAddressListLoader loader = (SocketAddressListLoader) loaderObject;
                 this.loader = loader;
-                LOGGER.info("audit IpPortListLoader loaderClass:{}", loaderClassName);
+                LOGGER.info("Audit list loader class:{}", loaderClassName);
             }
         } catch (Throwable t) {
-            LOGGER.error("Fail to init IpPortListLoader,loaderClass:{},error:{}",
+            LOGGER.error("Fail to init list loader class:{},error:{}",
                     loaderClassName, t.getMessage(), t);
         }
     }
@@ -180,7 +162,7 @@ public class AuditOperator implements Serializable {
     }
 
     /**
-     * set audit config
+     * Set audit config
      */
     public void setAuditConfig(AuditConfig config) {
         auditConfig = config;
@@ -269,7 +251,7 @@ public class AuditOperator implements Serializable {
         AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder();
         requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());
 
-        // process the stat info for all threads
+        // Process the stat info for all threads
         for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) {
             // Entry key order: logTime inlongGroupID inlongStreamID auditID auditTag auditVersion
             String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
@@ -306,7 +288,7 @@ public class AuditOperator implements Serializable {
         }
         threadCountMap.clear();
 
-        LOGGER.info("finish report audit data");
+        LOGGER.info("Finish report audit data");
     }
 
     /**
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DefaultISocketAddressListLoader.java
similarity index 98%
rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java
rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DefaultISocketAddressListLoader.java
index c6629e6de2..6a8b6a3b29 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DefaultISocketAddressListLoader.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit;
+package org.apache.inlong.audit.loader;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DnsSocketAddressListLoader.java
similarity index 98%
rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java
rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DnsSocketAddressListLoader.java
index 95c9d969d3..fa0f289c41 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DnsSocketAddressListLoader.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit;
+package org.apache.inlong.audit.loader;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/SocketAddressListLoader.java
similarity index 96%
rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java
rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/SocketAddressListLoader.java
index 0902964a7d..067edb8a05 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/SocketAddressListLoader.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit;
+package org.apache.inlong.audit.loader;
 
 import java.util.List;
 import java.util.Map;