You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2024/03/13 08:53:31 UTC
(inlong) branch master updated: [INLONG-9809][Audit] SDK supports both singleton and non-singleton usage (#9812)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2ea39a2765 [INLONG-9809][Audit] SDK supports both singleton and non-singleton usage (#9812)
2ea39a2765 is described below
commit 2ea39a276556eb4319c588ab02440b8c0de3609c
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Wed Mar 13 16:53:26 2024 +0800
[INLONG-9809][Audit] SDK supports both singleton and non-singleton usage (#9812)
---
.../org/apache/inlong/audit/AuditOperator.java | 319 +--------------------
.../{AuditOperator.java => AuditReporterImpl.java} | 44 +--
.../DefaultISocketAddressListLoader.java | 2 +-
.../{ => loader}/DnsSocketAddressListLoader.java | 2 +-
.../{ => loader}/SocketAddressListLoader.java | 2 +-
5 files changed, 23 insertions(+), 346 deletions(-)
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
index 0c6f573f9b..85d6201a94 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
@@ -17,332 +17,27 @@
package org.apache.inlong.audit;
-import org.apache.inlong.audit.protocol.AuditApi;
-import org.apache.inlong.audit.send.SenderManager;
-import org.apache.inlong.audit.util.AuditConfig;
-import org.apache.inlong.audit.util.Config;
-import org.apache.inlong.audit.util.StatInfo;
-
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;
-
/**
* Audit operator, which is singleton.
+ * Creating objects through deserialization is not supported.
*/
-public class AuditOperator implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class);
- private static final String FIELD_SEPARATORS = ":";
- private static final String DEFAULT_AUDIT_TAG = "-1";
- private static final long DEFAULT_AUDIT_VERSION = -1;
- private static final int BATCH_NUM = 100;
- private static final AuditOperator AUDIT_OPERATOR = new AuditOperator();
- private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
- private static final int PERIOD = 1000 * 60;
- private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>();
- private final HashMap<String, StatInfo> threadCountMap = new HashMap<>();
- private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>();
- private final List<String> deleteKeyList = new ArrayList<>();
- private final Config config = new Config();
- private int packageId = 1;
- private int dataId = 0;
- private boolean initialized = false;
- private SenderManager manager;
-
- private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
- private AuditConfig auditConfig = null;
- private SocketAddressListLoader loader = null;
+public final class AuditOperator extends AuditReporterImpl {
/**
* Not support create from outer.
*/
private AuditOperator() {
-
- }
-
- /**
- * Get AuditOperator instance.
- */
- public static AuditOperator getInstance() {
- return AUDIT_OPERATOR;
}
- /**
- * init
- */
- private void init() {
- if (initialized) {
- return;
- }
- config.init();
- timeoutExecutor.scheduleWithFixedDelay(new Runnable() {
-
- @Override
- public void run() {
- try {
- loadIpPortList();
- send();
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- }
- }
-
- }, PERIOD, PERIOD, TimeUnit.MILLISECONDS);
- if (auditConfig == null) {
- auditConfig = new AuditConfig();
- }
- this.manager = new SenderManager(auditConfig);
- }
+ private static class AuditOperatorHandler {
- private void loadIpPortList() {
- if (loader == null) {
- return;
- }
- try {
- List<String> ipPortList = loader.loadSocketAddressList();
- if (ipPortList != null && ipPortList.size() > 0) {
- HashSet<String> ipPortSet = new HashSet<>();
- ipPortSet.addAll(ipPortList);
- this.setAuditProxy(ipPortSet);
- }
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- }
+ private static final AuditOperator INSTANCE = new AuditOperator();
}
/**
- * set loader
- *
- * @param loader the loader to set
- */
- public void setLoader(SocketAddressListLoader loader) {
- this.loader = loader;
- }
-
- /**
- * setLoaderClass
- *
- * @param loaderClassName
- */
- public void setLoaderClass(String loaderClassName) {
- if (StringUtils.isEmpty(loaderClassName)) {
- return;
- }
- try {
- Class<?> loaderClass = ClassUtils.getClass(loaderClassName);
- Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
- if (loaderObject instanceof SocketAddressListLoader) {
- SocketAddressListLoader loader = (SocketAddressListLoader) loaderObject;
- this.loader = loader;
- LOGGER.info("audit IpPortListLoader loaderClass:{}", loaderClassName);
- }
- } catch (Throwable t) {
- LOGGER.error("Fail to init IpPortListLoader,loaderClass:{},error:{}",
- loaderClassName, t.getMessage(), t);
- }
- }
-
- /**
- * Set AuditProxy from the ip
- */
- public void setAuditProxy(HashSet<String> ipPortList) {
- try {
- GLOBAL_LOCK.lockInterruptibly();
- if (!initialized) {
- init();
- initialized = true;
- }
- this.manager.setAuditProxy(ipPortList);
- } catch (InterruptedException e) {
- LOGGER.error(e.getMessage());
- } finally {
- GLOBAL_LOCK.unlock();
- }
- }
-
- /**
- * set audit config
- */
- public void setAuditConfig(AuditConfig config) {
- auditConfig = config;
- manager.setAuditConfig(config);
- }
-
- /**
- * Add audit data
- */
- public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size) {
- add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, logTime, count, size);
- }
-
- public void add(int auditID, String auditTag, String inlongGroupID, String inlongStreamID, Long logTime,
- long count, long size) {
- long delayTime = System.currentTimeMillis() - logTime;
- add(auditID, auditTag, inlongGroupID, inlongStreamID, logTime, count, size,
- delayTime * count, DEFAULT_AUDIT_VERSION);
- }
-
- public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size,
- long delayTime) {
- add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, logTime, count, size,
- delayTime, DEFAULT_AUDIT_VERSION);
- }
-
- public void add(int auditID, String auditTag, String inlongGroupID, String inlongStreamID, Long logTime,
- long count, long size, long delayTime, long auditVersion) {
- StringJoiner keyJoiner = new StringJoiner(FIELD_SEPARATORS);
- keyJoiner.add(String.valueOf(logTime / PERIOD));
- keyJoiner.add(inlongGroupID);
- keyJoiner.add(inlongStreamID);
- keyJoiner.add(String.valueOf(auditID));
- keyJoiner.add(auditTag);
- keyJoiner.add(String.valueOf(auditVersion));
- addByKey(keyJoiner.toString(), count, size, delayTime);
- }
-
- /**
- * Add audit info by key.
- */
- private void addByKey(String key, long count, long size, long delayTime) {
- if (countMap.get(key) == null) {
- countMap.put(key, new StatInfo(0L, 0L, 0L));
- }
- countMap.get(key).count.addAndGet(count);
- countMap.get(key).size.addAndGet(size);
- countMap.get(key).delay.addAndGet(delayTime);
- }
-
- /**
- * Send audit data
- */
- public synchronized void send() {
- manager.clearBuffer();
- resetStat();
- // Retrieve statistics from the list of objects without statistics to be eliminated
- for (Map.Entry<String, StatInfo> entry : this.deleteCountMap.entrySet()) {
- this.sumThreadGroup(entry.getKey(), entry.getValue());
- }
- this.deleteCountMap.clear();
- for (Map.Entry<String, StatInfo> entry : countMap.entrySet()) {
- String key = entry.getKey();
- StatInfo value = entry.getValue();
- // If there is no data, enter the list to be eliminated
- if (value.count.get() == 0) {
- this.deleteKeyList.add(key);
- continue;
- }
- this.sumThreadGroup(key, value);
- }
-
- // Clean up obsolete statistical data objects
- for (String key : this.deleteKeyList) {
- StatInfo value = this.countMap.remove(key);
- this.deleteCountMap.put(key, value);
- }
- this.deleteKeyList.clear();
-
- long sdkTime = Calendar.getInstance().getTimeInMillis();
- AuditApi.AuditMessageHeader msgHeader = AuditApi.AuditMessageHeader.newBuilder()
- .setIp(config.getLocalIP()).setDockerId(config.getDockerId())
- .setThreadId(String.valueOf(Thread.currentThread().getId()))
- .setSdkTs(sdkTime).setPacketId(packageId)
- .build();
- AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder();
- requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());
-
- // process the stat info for all threads
- for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) {
- // Entry key order: logTime inlongGroupID inlongStreamID auditID auditTag auditVersion
- String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
- long logTime = Long.parseLong(keyArray[0]) * PERIOD;
- String inlongGroupID = keyArray[1];
- String inlongStreamID = keyArray[2];
- String auditID = keyArray[3];
- String auditTag = keyArray[4];
- long auditVersion = Long.parseLong(keyArray[5]);
- StatInfo value = entry.getValue();
- AuditApi.AuditMessageBody msgBody = AuditApi.AuditMessageBody.newBuilder()
- .setLogTs(logTime)
- .setInlongGroupId(inlongGroupID)
- .setInlongStreamId(inlongStreamID)
- .setAuditId(auditID)
- .setAuditTag(auditTag)
- .setCount(value.count.get())
- .setSize(value.size.get())
- .setDelay(value.delay.get())
- .setAuditVersion(auditVersion)
- .build();
- requestBuild.addMsgBody(msgBody);
-
- if (dataId++ >= BATCH_NUM) {
- dataId = 0;
- packageId++;
- sendByBaseCommand(requestBuild.build());
- requestBuild.clearMsgBody();
- }
- }
- if (requestBuild.getMsgBodyCount() > 0) {
- sendByBaseCommand(requestBuild.build());
- requestBuild.clearMsgBody();
- }
- threadCountMap.clear();
-
- LOGGER.info("finish report audit data");
- }
-
- /**
- * Send base command
- */
- private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
- AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder();
- baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
- manager.send(baseCommand.build(), auditRequest);
- }
-
- /**
- * Summary
- */
- private void sumThreadGroup(String key, StatInfo statInfo) {
- long count = statInfo.count.getAndSet(0);
- if (0 == count) {
- return;
- }
- if (threadCountMap.get(key) == null) {
- threadCountMap.put(key, new StatInfo(0, 0, 0));
- }
-
- long size = statInfo.size.getAndSet(0);
- long delay = statInfo.delay.getAndSet(0);
-
- threadCountMap.get(key).count.addAndGet(count);
- threadCountMap.get(key).size.addAndGet(size);
- threadCountMap.get(key).delay.addAndGet(delay);
- }
-
- /**
- * Reset statistics
+ * Get AuditOperator instance.
*/
- private void resetStat() {
- dataId = 0;
- packageId = 1;
+ public static AuditOperator getInstance() {
+ return AuditOperatorHandler.INSTANCE;
}
}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
similarity index 92%
copy from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
copy to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index 0c6f573f9b..10c3913dd2 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -17,6 +17,7 @@
package org.apache.inlong.audit;
+import org.apache.inlong.audit.loader.SocketAddressListLoader;
import org.apache.inlong.audit.protocol.AuditApi;
import org.apache.inlong.audit.send.SenderManager;
import org.apache.inlong.audit.util.AuditConfig;
@@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -44,22 +44,18 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;
-/**
- * Audit operator, which is singleton.
- */
-public class AuditOperator implements Serializable {
+public class AuditReporterImpl implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AuditReporterImpl.class);
private static final String FIELD_SEPARATORS = ":";
private static final String DEFAULT_AUDIT_TAG = "-1";
private static final long DEFAULT_AUDIT_VERSION = -1;
private static final int BATCH_NUM = 100;
- private static final AuditOperator AUDIT_OPERATOR = new AuditOperator();
- private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
+ private final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
private static final int PERIOD = 1000 * 60;
private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>();
- private final HashMap<String, StatInfo> threadCountMap = new HashMap<>();
+ private final ConcurrentHashMap<String, StatInfo> threadCountMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>();
private final List<String> deleteKeyList = new ArrayList<>();
private final Config config = new Config();
@@ -73,21 +69,7 @@ public class AuditOperator implements Serializable {
private SocketAddressListLoader loader = null;
/**
- * Not support create from outer.
- */
- private AuditOperator() {
-
- }
-
- /**
- * Get AuditOperator instance.
- */
- public static AuditOperator getInstance() {
- return AUDIT_OPERATOR;
- }
-
- /**
- * init
+ * Init
*/
private void init() {
if (initialized) {
@@ -130,7 +112,7 @@ public class AuditOperator implements Serializable {
}
/**
- * set loader
+ * Set loader
*
* @param loader the loader to set
*/
@@ -139,7 +121,7 @@ public class AuditOperator implements Serializable {
}
/**
- * setLoaderClass
+ * SetLoaderClass
*
* @param loaderClassName
*/
@@ -153,10 +135,10 @@ public class AuditOperator implements Serializable {
if (loaderObject instanceof SocketAddressListLoader) {
SocketAddressListLoader loader = (SocketAddressListLoader) loaderObject;
this.loader = loader;
- LOGGER.info("audit IpPortListLoader loaderClass:{}", loaderClassName);
+ LOGGER.info("Audit list loader class:{}", loaderClassName);
}
} catch (Throwable t) {
- LOGGER.error("Fail to init IpPortListLoader,loaderClass:{},error:{}",
+ LOGGER.error("Fail to init list loader class:{},error:{}",
loaderClassName, t.getMessage(), t);
}
}
@@ -180,7 +162,7 @@ public class AuditOperator implements Serializable {
}
/**
- * set audit config
+ * Set audit config
*/
public void setAuditConfig(AuditConfig config) {
auditConfig = config;
@@ -269,7 +251,7 @@ public class AuditOperator implements Serializable {
AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder();
requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());
- // process the stat info for all threads
+ // Process the stat info for all threads
for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) {
// Entry key order: logTime inlongGroupID inlongStreamID auditID auditTag auditVersion
String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
@@ -306,7 +288,7 @@ public class AuditOperator implements Serializable {
}
threadCountMap.clear();
- LOGGER.info("finish report audit data");
+ LOGGER.info("Finish report audit data");
}
/**
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DefaultISocketAddressListLoader.java
similarity index 98%
rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java
rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DefaultISocketAddressListLoader.java
index c6629e6de2..6a8b6a3b29 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DefaultISocketAddressListLoader.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DefaultISocketAddressListLoader.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit;
+package org.apache.inlong.audit.loader;
import org.apache.commons.lang3.StringUtils;
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DnsSocketAddressListLoader.java
similarity index 98%
rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java
rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DnsSocketAddressListLoader.java
index 95c9d969d3..fa0f289c41 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/DnsSocketAddressListLoader.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/DnsSocketAddressListLoader.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit;
+package org.apache.inlong.audit.loader;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/SocketAddressListLoader.java
similarity index 96%
rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java
rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/SocketAddressListLoader.java
index 0902964a7d..067edb8a05 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/SocketAddressListLoader.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/loader/SocketAddressListLoader.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit;
+package org.apache.inlong.audit.loader;
import java.util.List;
import java.util.Map;