You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/14 02:17:59 UTC
[incubator-inlong] branch master updated: [INLONG-1734][feature][audit] audit-sdk module (#1968)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dd796fc [INLONG-1734][feature][audit] audit-sdk module (#1968)
dd796fc is described below
commit dd796fca09738972d56f9ae41a7b6f9499037ec9
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Tue Dec 14 10:17:50 2021 +0800
[INLONG-1734][feature][audit] audit-sdk module (#1968)
---
inlong-audit/audit-sdk/README.md | 31 +++
inlong-audit/audit-sdk/pom.xml | 73 ++++++
.../java/org/apache/inlong/audit/AuditImp.java | 250 ++++++++++++++++++
.../apache/inlong/audit/send/SenderChannel.java | 103 ++++++++
.../org/apache/inlong/audit/send/SenderGroup.java | 283 +++++++++++++++++++++
.../apache/inlong/audit/send/SenderHandler.java | 88 +++++++
.../apache/inlong/audit/send/SenderManager.java | 224 ++++++++++++++++
.../org/apache/inlong/audit/util/AuditData.java | 72 ++++++
.../java/org/apache/inlong/audit/util/Config.java | 109 ++++++++
.../java/org/apache/inlong/audit/util/Decoder.java | 56 ++++
.../java/org/apache/inlong/audit/util/Encoder.java | 39 +++
.../java/org/apache/inlong/audit/util/IpPort.java | 148 +++++++++++
.../org/apache/inlong/audit/util/SenderResult.java | 47 ++++
.../org/apache/inlong/audit/util/StatInfo.java | 32 +++
.../java/org/apache/inlong/audit/AuditImpTest.java | 43 ++++
.../inlong/audit/send/SenderChannelTest.java | 101 ++++++++
.../apache/inlong/audit/send/SenderGroupTest.java | 85 +++++++
.../inlong/audit/send/SenderManagerTest.java | 63 +++++
.../apache/inlong/audit/util/AuditDataTest.java | 53 ++++
.../org/apache/inlong/audit/util/ConfigTest.java | 41 +++
.../org/apache/inlong/audit/util/IpPortTest.java | 66 +++++
inlong-audit/pom.xml | 1 +
22 files changed, 2008 insertions(+)
diff --git a/inlong-audit/audit-sdk/README.md b/inlong-audit/audit-sdk/README.md
new file mode 100644
index 0000000..57ab063
--- /dev/null
+++ b/inlong-audit/audit-sdk/README.md
@@ -0,0 +1,31 @@
+ # Description
+## overview
+The audit sdk is used to count the receiving and sending volume of each module in real time according to the cycle,
+and the statistical results are sent to the audit access layer according to the cycle.
+
+## features
+### data uniqueness
+The audit sdk will add a unique mark to each audit audit, which can be used to remove duplicates.
+
+### unified audit standard
+The audit sdk uses log production time as the audit standard,
+which can ensure that each module is reconciled in accordance with the unified audit standard.
+
+## usage
+### setAuditProxy
+Set the audit access layer ip:port list. The audit sdk will summarize the results according to the cycle
+and send them to the ip:port list set by the interface.
+If the ip:port of the audit access layer is fixed, then this interface needs to be called once.
+If the audit access changes in real time, then the business program needs to call this interface periodically to update
+```java
+ HashSet<String> ipPortList=new HashSet<>();
+ ipPortList.add("0.0.0.0:54041");
+ AuditImp.getInstance().setAuditProxy(ipPortList);
+```
+
+### add
+Call the add method for statistics, where the auditID parameter uniquely identifies an audit object,
+inlongGroupID,inlongStreamID,logTime are audit dimensions, count is the number of items, size is the size, and logTime is milliseconds.
+```java
+ AuditImp.getInstance().add(1, "inlongGroupIDTest","inlongStreamIDTest", System.currentTimeMillis(), 1, 1);
+```
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/pom.xml b/inlong-audit/audit-sdk/pom.xml
new file mode 100644
index 0000000..2657bd6
--- /dev/null
+++ b/inlong-audit/audit-sdk/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-audit</artifactId>
+ <version>0.12.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>audit-sdk</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache InLong - Audit Sdk</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <compiler.source>1.8</compiler.source>
+ <compiler.target>1.8</compiler.target>
+ <netty.version>3.8.0.Final</netty.version>
+ <junit.version>4.4</junit.version>
+ <protobuf.version>3.19.1</protobuf.version>
+ <commons.version>3.0</commons.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-common</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java
new file mode 100644
index 0000000..872cc78
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+import org.apache.inlong.audit.send.SenderManager;
+import org.apache.inlong.audit.util.Config;
+import org.apache.inlong.audit.util.StatInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDITREQUEST;
+
+public class AuditImp {
+ private static final Logger logger = LoggerFactory.getLogger(AuditImp.class);
+ private static AuditImp auditImp = new AuditImp();
+ private static final String FIELD_SEPARATORS = ":";
+ private ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<String, StatInfo>();
+ private HashMap<String, StatInfo> threadSumMap = new HashMap<String, StatInfo>();
+ private ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<String, StatInfo>();
+ private List<String> deleteKeyList = new ArrayList<String>();
+ private Config config = new Config();
+ private Long sdkTime;
+ private int packageId = 1;
+ private int dataId = 0;
+ private static final int BATCH_NUM = 100;
+ boolean inited = false;
+ private SenderManager manager;
+ private static ReentrantLock globalLock = new ReentrantLock();
+ private static int PERIOD = 1000 * 60;
+ private Timer timer = new Timer();
+ private TimerTask timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ sendReport();
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+ };
+
+ public static AuditImp getInstance() {
+ return auditImp;
+ }
+
+ /**
+ * init
+ */
+ private void init() {
+ if (inited) {
+ return;
+ }
+ config.init();
+ timer.schedule(timerTask, PERIOD, PERIOD);
+ this.manager = new SenderManager(config);
+ }
+
+ /**
+ * setAuditProxy
+ *
+ * @param ipPortList
+ */
+ public void setAuditProxy(HashSet<String> ipPortList) {
+ try {
+ globalLock.lockInterruptibly();
+ if (!inited) {
+ init();
+ inited = true;
+ }
+ this.manager.setAuditProxy(ipPortList);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage());
+ } finally {
+ globalLock.unlock();
+ }
+ }
+
+ /**
+ * api
+ *
+ * @param auditID
+ * @param inlongGroupID
+ * @param inlongStreamID
+ * @param logTime
+ * @param count
+ * @param size
+ */
+ public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size) {
+ long delayTime = System.currentTimeMillis() - logTime;
+ String key = (logTime / PERIOD) + FIELD_SEPARATORS + inlongGroupID + FIELD_SEPARATORS
+ + inlongStreamID + FIELD_SEPARATORS + auditID;
+ addByKey(key, count, size, delayTime);
+ }
+
+ /**
+ * add by key
+ *
+ * @param key
+ * @param count
+ * @param size
+ * @param delayTime
+ */
+ private void addByKey(String key, long count, long size, long delayTime) {
+ try {
+ if (countMap.get(key) == null) {
+ countMap.put(key, new StatInfo(0L, 0L, 0L));
+ }
+ countMap.get(key).count.addAndGet(count);
+ countMap.get(key).size.addAndGet(size);
+ countMap.get(key).delay.addAndGet(delayTime * count);
+ } catch (Exception e) {
+ return;
+ }
+ }
+
+ /**
+ * Report audit data
+ */
+ private synchronized void sendReport() {
+ manager.clearBuffer();
+ resetStat();
+ // Retrieve statistics from the list of objects without statistics to be eliminated
+ for (Map.Entry<String, StatInfo> entry : this.deleteCountMap.entrySet()) {
+ this.sumThreadGroup(entry.getKey(), entry.getValue());
+ }
+ this.deleteCountMap.clear();
+ for (Map.Entry<String, StatInfo> entry : countMap.entrySet()) {
+ String key = entry.getKey();
+ StatInfo value = entry.getValue();
+ // If there is no data, enter the list to be eliminated
+ if (value.count.get() == 0) {
+ this.deleteKeyList.add(key);
+ continue;
+ }
+ this.sumThreadGroup(key, value);
+ }
+
+ // Clean up obsolete statistical data objects
+ for (String key : this.deleteKeyList) {
+ StatInfo value = this.countMap.remove(key);
+ this.deleteCountMap.put(key, value);
+ }
+ this.deleteKeyList.clear();
+ sdkTime = Calendar.getInstance().getTimeInMillis();
+ AuditApi.AuditMessageHeader mssageHeader = AuditApi.AuditMessageHeader.newBuilder()
+ .setIp(config.getLocalIP()).setDockerId(config.getDockerId())
+ .setThreadId(String.valueOf(Thread.currentThread().getId()))
+ .setSdkTs(sdkTime).setPacketId(packageId)
+ .build();
+ AuditApi.AuditRequest.Builder requestBulid = AuditApi.AuditRequest.newBuilder();
+ requestBulid.setMsgHeader(mssageHeader).setRequestId(manager.nextRequestId());
+ for (Map.Entry<String, StatInfo> entry : threadSumMap.entrySet()) {
+ String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
+ long logTime = Long.parseLong(keyArray[0]) * 60;
+ String inlongGroupID = keyArray[1];
+ String inlongStreamID = keyArray[2];
+ String auditID = keyArray[3];
+ StatInfo value = entry.getValue();
+ AuditApi.AuditMessageBody mssageBody = AuditApi.AuditMessageBody.newBuilder()
+ .setLogTs(logTime).setInlongGroupId(inlongGroupID)
+ .setInlongStreamId(inlongStreamID).setAuditId(auditID)
+ .setCount(value.count.get()).setSize(value.size.get())
+ .setDelay(value.delay.get())
+ .build();
+ requestBulid.addMsgBody(mssageBody);
+ if (dataId++ >= BATCH_NUM) {
+ dataId = 0;
+ packageId++;
+ sendByBaseCommand(sdkTime, requestBulid.build());
+ requestBulid.clearMsgBody();
+ }
+ }
+ if (requestBulid.getMsgBodyCount() > 0) {
+ sendByBaseCommand(sdkTime, requestBulid.build());
+ requestBulid.clearMsgBody();
+ }
+ threadSumMap.clear();
+ logger.info("finished send report.");
+ }
+
+ /**
+ * send base command
+ *
+ * @param sdkTime
+ * @param auditRequest
+ */
+ private void sendByBaseCommand(long sdkTime, AuditApi.AuditRequest auditRequest) {
+ AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder();
+ baseCommand.setType(AUDITREQUEST).setAuditRequest(auditRequest).build();
+ manager.send(sdkTime, baseCommand.build());
+ }
+
+ /**
+ * Summary
+ *
+ * @param key
+ * @param statInfo
+ */
+ private void sumThreadGroup(String key, StatInfo statInfo) {
+ long count = statInfo.count.getAndSet(0);
+ if (0 == count) {
+ return;
+ }
+ if (threadSumMap.get(key) == null) {
+ threadSumMap.put(key, new StatInfo(0, 0, 0));
+ }
+
+ long size = statInfo.size.getAndSet(0);
+ long delay = statInfo.delay.getAndSet(0);
+
+ threadSumMap.get(key).count.addAndGet(count);
+ threadSumMap.get(key).size.addAndGet(size);
+ threadSumMap.get(key).delay.addAndGet(delay);
+ }
+
+ /**
+ * Reset statistics
+ */
+ private void resetStat() {
+ dataId = 0;
+ packageId = 1;
+ }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
new file mode 100644
index 0000000..1ecbd4f
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.util.IpPort;
+import org.jboss.netty.channel.Channel;
+
+import java.util.concurrent.Semaphore;
+
+public class SenderChannel {
+
+ private IpPort ipPort;
+ private Channel channel;
+ private Semaphore packToken;
+
+ /**
+ * Constructor
+ *
+ * @param channel
+ * @param ipPort
+ */
+ public SenderChannel(Channel channel, IpPort ipPort, int maxSynchRequest) {
+ this.channel = channel;
+ this.ipPort = ipPort;
+ this.packToken = new Semaphore(maxSynchRequest);
+ this.channel.getConfig().setConnectTimeoutMillis(5000);
+ }
+
+ /**
+ * Try acquire channel
+ *
+ * @return
+ */
+ public boolean tryAcquire() {
+ return packToken.tryAcquire();
+ }
+
+ /**
+ * release channel
+ */
+ public void release() {
+ packToken.release();
+ }
+
+ /**
+ * toString
+ */
+ @Override
+ public String toString() {
+ return ipPort.key;
+ }
+
+ /**
+ * get ipPort
+ *
+ * @return the ipPort
+ */
+ public IpPort getIpPort() {
+ return ipPort;
+ }
+
+ /**
+ * set ipPort
+ *
+ * @param ipPort the ipPort to set
+ */
+ public void setIpPort(IpPort ipPort) {
+ this.ipPort = ipPort;
+ }
+
+ /**
+ * get channel
+ *
+ * @return the channel
+ */
+ public Channel getChannel() {
+ return channel;
+ }
+
+ /**
+ * set channel
+ *
+ * @param channel the channel to set
+ */
+ public void setChannel(Channel channel) {
+ this.channel = channel;
+ }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
new file mode 100644
index 0000000..fab58e8
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.util.Encoder;
+import org.apache.inlong.audit.util.IpPort;
+import org.apache.inlong.audit.util.SenderResult;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class SenderGroup {
+ private static final Logger logger = LoggerFactory.getLogger(SenderGroup.class);
+ // maximum number of sending
+ public static final int MAX_SEND_TIMES = 3;
+ public static final int DEFAULT_WAIT_TIMES = 10000;
+ public static final int WAIT_INTERVAL = 1;
+ public static final int DEFAULT_SYNCH_REQUESTS = 1;
+
+ private ClientBootstrap client = new ClientBootstrap();
+ private List<LinkedBlockingQueue<SenderChannel>> channelGroups = new ArrayList<>();
+ private int mIndex = 0;
+ private List<SenderChannel> deleteChannels = new ArrayList<>();
+ private ConcurrentHashMap<String, SenderChannel> totalChannels = new ConcurrentHashMap<>();
+
+ private int senderThreadNum;
+ private int waitChannelTimes = DEFAULT_WAIT_TIMES;
+ private int waitChannelIntervalMs = WAIT_INTERVAL;
+ private int maxSynchRequest = DEFAULT_SYNCH_REQUESTS;
+ private boolean hasSendError = false;
+
+ /**
+ * constructor
+ *
+ * @param senderThreadNum
+ * @param decoder
+ * @param clientHandler
+ */
+ public SenderGroup(int senderThreadNum, ChannelUpstreamHandler decoder,
+ SimpleChannelHandler clientHandler) {
+ this.senderThreadNum = senderThreadNum;
+
+ client.setFactory(new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool(),
+ this.senderThreadNum));
+
+ client.setPipelineFactory(() -> {
+ ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("decoder", decoder);
+ pipeline.addLast("encoder", new Encoder());
+ pipeline.addLast("handler", clientHandler);
+ return pipeline;
+ });
+ client.setOption("tcpNoDelay", true);
+ client.setOption("child.tcpNoDelay", true);
+ client.setOption("keepAlive", true);
+ client.setOption("child.keepAlive", true);
+ client.setOption("reuseAddr", true);
+
+ channelGroups.add(new LinkedBlockingQueue<>());
+ channelGroups.add(new LinkedBlockingQueue<>());
+ }
+
+ /**
+ * send data
+ *
+ * @param dataBuf
+ * @return
+ */
+ public SenderResult send(ChannelBuffer dataBuf) {
+ LinkedBlockingQueue<SenderChannel> channels = channelGroups.get(mIndex);
+ SenderChannel channel = null;
+ try {
+ if (channels.size() <= 0) {
+ logger.error("channels is empty");
+ return new SenderResult("channels is empty", 0, false);
+ }
+ boolean isOk = false;
+ for (int tryIndex = 0; tryIndex < waitChannelTimes; tryIndex++) {
+ channels = channelGroups.get(mIndex);
+ for (int i = 0; i < channels.size(); i++) {
+ channel = channels.poll();
+ boolean ret = channel.tryAcquire();
+ if (channel.tryAcquire()) {
+ isOk = true;
+ break;
+ }
+
+ if (ret) {
+ isOk = true;
+ break;
+ }
+ channels.offer(channel);
+ channel = null;
+ }
+ if (isOk) {
+ break;
+ }
+ try {
+ Thread.sleep(waitChannelIntervalMs);
+ } catch (Throwable e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ if (channel == null) {
+ logger.error("can not get a channel");
+ return new SenderResult("can not get a channel", 0, false);
+ }
+ ChannelFuture t = null;
+ if (channel.getChannel().isConnected()) {
+ t = channel.getChannel().write(dataBuf).sync().await();
+ if (!t.isSuccess()) {
+ if (!channel.getChannel().isConnected()) {
+ reconnect(channel);
+ }
+ t = channel.getChannel().write(dataBuf).sync().await();
+ }
+ } else {
+ reconnect(channel);
+ t = channel.getChannel().write(dataBuf).sync().await();
+ }
+ return new SenderResult(channel.getIpPort().ip, channel.getIpPort().port, t.isSuccess());
+ } catch (Throwable ex) {
+ logger.error(ex.getMessage());
+ this.setHasSendError(true);
+ return new SenderResult(ex.getMessage(), 0, false);
+ } finally {
+ if (channel != null) {
+ channel.release();
+ channels.offer(channel);
+ }
+ }
+ }
+
+ /**
+ * release channel
+ */
+ public void release(String ipPort) {
+ SenderChannel channel = this.totalChannels.get(ipPort);
+ if (channel != null) {
+ channel.release();
+ }
+ }
+
+ /**
+ * release channel
+ */
+ public void release(InetSocketAddress addr) {
+ String destIp = addr.getHostName();
+ int destPort = addr.getPort();
+ String ipPort = IpPort.getIpPortKey(destIp, destPort);
+ SenderChannel channel = this.totalChannels.get(ipPort);
+ if (channel != null) {
+ channel.release();
+ }
+ }
+
+ /**
+ * update config
+ *
+ * @param ipLists
+ */
+ public void updateConfig(Set<String> ipLists) {
+ try {
+ for (SenderChannel dc : deleteChannels) {
+ dc.getChannel().disconnect();
+ dc.getChannel().close();
+ }
+ deleteChannels.clear();
+ int newIndex = mIndex ^ 0x01;
+ LinkedBlockingQueue<SenderChannel> newChannels = this.channelGroups.get(newIndex);
+ newChannels.clear();
+ for (String ipPort : ipLists) {
+ SenderChannel channel = totalChannels.get(ipPort);
+ if (channel != null) {
+ newChannels.add(channel);
+ continue;
+ }
+ try {
+ IpPort ipPortObj = IpPort.parseIpPort(ipPort);
+ if (ipPortObj == null) {
+ continue;
+ }
+ ChannelFuture future = client.connect(ipPortObj.addr).await();
+ channel = new SenderChannel(future.getChannel(), ipPortObj, maxSynchRequest);
+ newChannels.add(channel);
+ totalChannels.put(ipPort, channel);
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ for (Entry<String, SenderChannel> entry : totalChannels.entrySet()) {
+ if (!ipLists.contains(entry.getKey())) {
+ deleteChannels.add(entry.getValue());
+ }
+ }
+ for (SenderChannel dc : deleteChannels) {
+ totalChannels.remove(dc.getIpPort().key);
+ }
+ this.mIndex = newIndex;
+ } catch (Throwable e) {
+ logger.error("Update Sender Ip Failed." + e.getMessage());
+ }
+ }
+
+ /**
+ * reconnect
+ *
+ * @param channel
+ */
+ private void reconnect(SenderChannel channel) {
+ try {
+ synchronized (channel) {
+ if (channel.getChannel().isOpen()) {
+ return;
+ }
+
+ Channel oldChannel = channel.getChannel();
+ ChannelFuture future = client.connect(channel.getIpPort().addr).await();
+ Channel newChannel = future.getChannel();
+ channel.setChannel(newChannel);
+ oldChannel.disconnect();
+ oldChannel.close();
+ }
+ } catch (Throwable e) {
+ logger.error("reconnect failed." + e.getMessage());
+ }
+ }
+
+ /**
+ * get hasSendError
+ *
+ * @return the hasSendError
+ */
+ public boolean isHasSendError() {
+ return hasSendError;
+ }
+
+ /**
+ * set hasSendError
+ *
+ * @param hasSendError the hasSendError to set
+ */
+ public void setHasSendError(boolean hasSendError) {
+ this.hasSendError = hasSendError;
+ }
+
+}
+
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
new file mode 100644
index 0000000..cf43c18
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SenderHandler extends SimpleChannelHandler {
+ private static final Logger logger = LoggerFactory.getLogger(SenderHandler.class);
+ private SenderManager manager;
+
+ /**
+ * Constructor
+ *
+ * @param manager
+ */
+ public SenderHandler(SenderManager manager) {
+ this.manager = manager;
+ }
+
+ /**
+ * Message Received
+ */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ try {
+ manager.onMessageReceived(ctx, e);
+ } catch (Throwable ex) {
+ logger.error(ex.getMessage());
+ }
+ }
+
+ /**
+ * Caught exception
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ try {
+ manager.onExceptionCaught(ctx, e);
+ } catch (Throwable ex) {
+ logger.error(ex.getMessage());
+ }
+ }
+
+ /**
+ * Disconnected channel
+ */
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ try {
+ super.channelDisconnected(ctx, e);
+ } catch (Throwable ex) {
+ logger.error(ex.getMessage());
+ }
+ }
+
+ /**
+ * Closed channel
+ */
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ try {
+ super.channelClosed(ctx, e);
+ } catch (Throwable ex) {
+ logger.error(ex.getMessage());
+ }
+ }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
new file mode 100644
index 0000000..6959341
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+import org.apache.inlong.audit.util.AuditData;
+import org.apache.inlong.audit.util.Config;
+import org.apache.inlong.audit.util.Decoder;
+import org.apache.inlong.audit.util.SenderResult;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * sender manager
+ */
+public class SenderManager {
+ private static final Logger logger = LoggerFactory.getLogger(SenderManager.class);
+ public static final int DEFAULT_SEND_THREADNUM = 2;
+ public static final Long MAX_REQUEST_ID = 1000000000L;
+ public static final int ALL_CONNECT_CHANNEL = -1;
+ public static final int DEFAULT_CONNECT_CHANNEL = 2;
+
+ private SenderGroup sender;
+ private int maxConnectChannels = ALL_CONNECT_CHANNEL;
+ private SecureRandom sRandom = new SecureRandom(Long.toString(System.currentTimeMillis()).getBytes());
+ // IPList
+ private HashSet<String> currentIpPorts = new HashSet<String>();
+ private AtomicLong requestIdSeq = new AtomicLong(0L);
+ private ConcurrentHashMap<Long, AuditData> dataMap = new ConcurrentHashMap<>();
+ private Config config;
+
+ /**
+ * Constructor
+ *
+ * @param config
+ */
+ public SenderManager(Config config) {
+ this(config, DEFAULT_CONNECT_CHANNEL);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param config
+ * @param maxConnectChannels
+ */
+ public SenderManager(Config config, int maxConnectChannels) {
+ try {
+ this.config = config;
+ this.maxConnectChannels = maxConnectChannels;
+ SenderHandler clientHandler = new SenderHandler(this);
+ this.sender = new SenderGroup(DEFAULT_SEND_THREADNUM, new Decoder(), clientHandler);
+ } catch (Exception ex) {
+ logger.error(ex.getMessage());
+ }
+ }
+
+ /**
+ * update config
+ */
+ public void setAuditProxy(HashSet<String> ipPortList) {
+ if (ipPortList.equals(currentIpPorts) && !this.sender.isHasSendError()) {
+ return;
+ }
+ this.sender.setHasSendError(false);
+ this.currentIpPorts = ipPortList;
+ int ipSize = ipPortList.size();
+ int needNewSize = 0;
+ if (this.maxConnectChannels == ALL_CONNECT_CHANNEL || this.maxConnectChannels >= ipSize) {
+ needNewSize = ipSize;
+ } else {
+ needNewSize = maxConnectChannels;
+ }
+ HashSet<String> updateConfigIpLists = new HashSet<>();
+ List<String> availableIpLists = new ArrayList<String>();
+ availableIpLists.addAll(ipPortList);
+ for (int i = 0; i < needNewSize; i++) {
+ int availableIpSize = availableIpLists.size();
+ int newIpPortIndex = this.sRandom.nextInt(availableIpSize);
+ String ipPort = availableIpLists.remove(newIpPortIndex);
+ updateConfigIpLists.add(ipPort);
+ }
+ if (updateConfigIpLists.size() > 0) {
+ this.sender.updateConfig(updateConfigIpLists);
+ }
+ }
+
+ /**
+ * next requestid
+ *
+ * @return
+ */
+ public Long nextRequestId() {
+ Long requestId = requestIdSeq.getAndIncrement();
+ if (requestId > MAX_REQUEST_ID) {
+ requestId = 0L;
+ requestIdSeq.set(requestId);
+ }
+ return requestId;
+ }
+
+ /**
+ * send data
+ *
+ * @param sdkTime
+ * @param baseCommand
+ */
+ public void send(long sdkTime, AuditApi.BaseCommand baseCommand) {
+ AuditData data = new AuditData(sdkTime, baseCommand);
+ // Cache first
+ this.dataMap.putIfAbsent(baseCommand.getAuditRequest().getRequestId(), data);
+ this.sendData(data);
+ }
+
+ /**
+ * send data
+ *
+ * @param data
+ */
+ private void sendData(AuditData data) {
+ ChannelBuffer dataBuf = ChannelBuffers.wrappedBuffer(data.getDataByte());
+ SenderResult result = this.sender.send(dataBuf);
+ if (!result.result) {
+ this.sender.setHasSendError(true);
+ }
+ }
+
+ /**
+ * Clean up the backlog of unsent message packets
+ */
+ public void clearBuffer() {
+ for (AuditData data : this.dataMap.values()) {
+ this.sendData(data);
+ }
+ }
+
+ /**
+ * get data map szie
+ */
+ public int getDataMapSize() {
+ return this.dataMap.size();
+ }
+
+ /**
+ * processing return package
+ *
+ * @param ctx
+ * @param e
+ */
+ public void onMessageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ try {
+ //Analyze abnormal events
+ if (!(e.getMessage() instanceof ChannelBuffer)) {
+ logger.error("onMessageReceived e.getMessage:" + e.getMessage());
+ return;
+ }
+ ChannelBuffer readBuffer = (ChannelBuffer) e.getMessage();
+ byte[] readBytes = readBuffer.toByteBuffer().array();
+ AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.parseFrom(readBytes);
+ // Parse request id
+ Long requestId = baseCommand.getAuditReply().getRequestId();
+ AuditData data = this.dataMap.get(requestId);
+ if (data == null) {
+ logger.error("can not find the requestid onMessageReceived:" + requestId);
+ return;
+ }
+ if (AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(baseCommand.getAuditReply().getRspCode())) {
+ this.dataMap.remove(requestId);
+ this.sender.notifyAll();
+ return;
+ }
+ int resendTimes = data.increaseResendTimes();
+ if (resendTimes < org.apache.inlong.audit.send.SenderGroup.MAX_SEND_TIMES) {
+ this.sendData(data);
+ }
+ this.sender.notifyAll();
+ } catch (Throwable ex) {
+ logger.error(ex.getMessage());
+ this.sender.setHasSendError(true);
+ }
+ }
+
+ /**
+ * Handle the packet return exception
+ *
+ * @param ctx
+ * @param e
+ */
+ public void onExceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ logger.error(e.getCause().getMessage());
+ try {
+ this.sender.setHasSendError(true);
+ } catch (Throwable ex) {
+ logger.error(ex.getMessage());
+ }
+ }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
new file mode 100644
index 0000000..03c101e
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AuditData {
+ public static int HEAD_LENGTH = 4;
+ private final long sdkTime;
+ private final AuditApi.BaseCommand content;
+ private AtomicInteger resendTimes = new AtomicInteger(0);
+
+ /**
+ * Constructor
+ *
+ * @param sdkTime
+ * @param content
+ */
+ public AuditData(long sdkTime, AuditApi.BaseCommand content) {
+ this.sdkTime = sdkTime;
+ this.content = content;
+ }
+
+ /**
+ * set resendTimes
+ */
+ public int increaseResendTimes() {
+ return this.resendTimes.incrementAndGet();
+ }
+
+ /**
+ * getDataByte
+ *
+ * @return
+ */
+ public byte[] getDataByte() {
+ return addBytes(ByteBuffer.allocate(HEAD_LENGTH).putInt(content.toByteArray().length).array(),
+ content.toByteArray());
+ }
+
+ /**
+ * Concatenated byte array
+ *
+ * @param data1
+ * @param data2
+ * @return data1 and data2 combined package result
+ */
+ public byte[] addBytes(byte[] data1, byte[] data2) {
+ byte[] data3 = new byte[data1.length + data2.length];
+ System.arraycopy(data1, 0, data3, 0, data1.length);
+ System.arraycopy(data2, 0, data3, data1.length, data2.length);
+ return data3;
+ }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
new file mode 100644
index 0000000..f7ce314
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+public class Config {
+ private static final Logger logger = LoggerFactory.getLogger(Config.class);
+ private String localIP = "";
+ private String dockerId = "";
+
+ public void init() {
+ initIP();
+ initDockerId();
+ }
+
+ public String getLocalIP() {
+ return localIP;
+ }
+
+ public String getDockerId() {
+ return dockerId;
+ }
+
+ private void initIP() {
+ try {
+ for (Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) {
+ NetworkInterface intf = en.nextElement();
+ String name = intf.getName();
+ if (!name.contains("docker") && !name.contains("lo")) {
+ for (Enumeration<InetAddress> enumIpAddr = intf.getInetAddresses();
+ enumIpAddr.hasMoreElements(); ) {
+ InetAddress inetAddress = enumIpAddr.nextElement();
+ if (!inetAddress.isLoopbackAddress()) {
+ String ipaddress = inetAddress.getHostAddress();
+ if (!ipaddress.contains("::") && !ipaddress.contains("0:0:")
+ && !ipaddress.contains("fe80")) {
+ localIP = ipaddress;
+ }
+ }
+ }
+ }
+ }
+ } catch (SocketException ex) {
+ localIP = "127.0.0.1";
+ return;
+ }
+ }
+
+ private void initDockerId() {
+ BufferedReader in = null;
+ try {
+ File file = new File("/proc/self/cgroup");
+ if (file.exists() == false) {
+ return;
+ }
+ in = new BufferedReader(new FileReader("/proc/self/cgroup"));
+ String dockerID = in.readLine();
+ if (dockerID.equals("") == false) {
+ int n = dockerID.indexOf("/");
+ String dockerID2 = dockerID.substring(n + 1, (dockerID.length() - n - 1));
+ n = dockerID2.indexOf("/");
+ dockerId = dockerID2.substring(n + 1, 12);
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ return;
+ } catch (NullPointerException e2) {
+ logger.error(e2.getMessage());
+ return;
+ } catch (Exception e3) {
+ logger.error(e3.getMessage());
+ return;
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e4) {
+ logger.error(e4.getMessage());
+ }
+ }
+ }
+ }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java
new file mode 100644
index 0000000..01ea67b
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class Decoder extends FrameDecoder {
+ // Maximum return packet size
+ private static final int MAX_RESPONSE_LENGTH = 8 * 1024 * 1024;
+
+ /**
+ * decoding
+ */
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
+ // Every time you need to read the complete package (that is, read to the end of the package),
+ // otherwise only the first one will be parsed correctly,
+ // which will adversely affect the parsing of the subsequent package
+ buffer.markReaderIndex();
+ //Packet composition: 4 bytes length content + ProtocolBuffer content
+ int totalLen = buffer.readInt();
+ // Respond to abnormal channel, interrupt in time to avoid stuck
+ if (totalLen > MAX_RESPONSE_LENGTH) {
+ channel.close();
+ return null;
+ }
+ // If the package is not complete, continue to wait for the return package
+ if (buffer.readableBytes() < totalLen) {
+ buffer.resetReaderIndex();
+ return null;
+ }
+ ChannelBuffer returnBuffer = new DynamicChannelBuffer(ChannelBuffers.BIG_ENDIAN, totalLen);
+ buffer.readBytes(returnBuffer, 0, totalLen);
+ return returnBuffer;
+ }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Encoder.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Encoder.java
new file mode 100644
index 0000000..abb3270
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Encoder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+public class Encoder extends OneToOneEncoder {
+
+ /**
+ * Encoder
+ */
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) {
+ if (!(msg instanceof ChannelBuffer)) {
+ return msg;
+ } else {
+ ChannelBuffer src = (ChannelBuffer) msg;
+ return src;
+ }
+ }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java
new file mode 100644
index 0000000..fa4ba5e
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.apache.commons.lang3.math.NumberUtils;
+import org.jboss.netty.channel.Channel;
+
+import java.net.InetSocketAddress;
+
+public class IpPort {
+ public static final String SEPARATOR = ":";
+ public final String ip;
+ public final int port;
+ public final String key;
+ public final InetSocketAddress addr;
+
+ /**
+ * Constructor
+ *
+ * @param ip
+ * @param port
+ */
+ public IpPort(String ip, int port) {
+ this.ip = ip;
+ this.port = port;
+ this.key = getIpPortKey(ip, port);
+ this.addr = new InetSocketAddress(ip, port);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param addr
+ */
+ public IpPort(InetSocketAddress addr) {
+ this.ip = addr.getHostName();
+ this.port = addr.getPort();
+ this.key = getIpPortKey(ip, port);
+ this.addr = addr;
+ }
+
+ /**
+ * get IpPort by key
+ *
+ * @param ip
+ * @param port
+ * @return
+ */
+ public static String getIpPortKey(String ip, int port) {
+ return ip + ":" + port;
+ }
+
+ /**
+ * parse sIpPort
+ *
+ * @param ipPort
+ * @return
+ */
+ public static IpPort parseIpPort(String ipPort) {
+ String[] splits = ipPort.split(SEPARATOR);
+ if (splits.length == 2) {
+ String strIp = splits[0];
+ String strPort = splits[1];
+ int port = NumberUtils.toInt(strPort, 0);
+ if (port > 0) {
+ return new IpPort(strIp, port);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * parse InetSocketAddress
+ *
+ * @param channel
+ * @return
+ */
+ public static InetSocketAddress parseInetSocketAddress(Channel channel) {
+ InetSocketAddress destAddr = null;
+ if (channel.getRemoteAddress() instanceof InetSocketAddress) {
+ destAddr = (InetSocketAddress) channel.getRemoteAddress();
+ } else {
+ String sendIp = channel.getRemoteAddress().toString();
+ destAddr = new InetSocketAddress(sendIp, 0);
+ }
+ return destAddr;
+ }
+
+ /**
+ * hashCode
+ */
+ @Override
+ public int hashCode() {
+ int result = ip.hashCode();
+ result = 31 * result + port;
+ return result;
+ }
+
+ /**
+ * equals
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ if (!(o instanceof IpPort)) {
+ return false;
+ }
+
+ try {
+ IpPort ctp = (IpPort) o;
+ if (ip != null && ip.equals(ctp.port) && port == ctp.port) {
+ return true;
+ }
+ } catch (Exception e) {
+ return false;
+ }
+ return false;
+ }
+
+ /**
+ * toString
+ */
+ public String toString() {
+ return key;
+ }
+}
+
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
new file mode 100644
index 0000000..69dcaf8
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+public class SenderResult {
+
+ public final IpPort ipPort;
+ public boolean result;
+
+ /**
+ * Constructor
+ *
+ * @param ipPort
+ * @param result
+ */
+ public SenderResult(IpPort ipPort, boolean result) {
+ this.ipPort = ipPort;
+ this.result = result;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param sendIp
+ * @param sendPort
+ * @param result
+ */
+ public SenderResult(String sendIp, int sendPort, boolean result) {
+ this.ipPort = new IpPort(sendIp, sendPort);
+ this.result = result;
+ }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/StatInfo.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/StatInfo.java
new file mode 100644
index 0000000..aecc22c
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/StatInfo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StatInfo {
+ public AtomicLong count = new AtomicLong(0);
+ public AtomicLong size = new AtomicLong(0);
+ public AtomicLong delay = new AtomicLong(0);
+
+ public StatInfo(long cnt, long sz, long dy) {
+ count.set(cnt);
+ size.set(sz);
+ delay.set(dy);
+ }
+}
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditImpTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditImpTest.java
new file mode 100644
index 0000000..9d07583
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditImpTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.junit.Test;
+
+import java.util.HashSet;
+
+public class AuditImpTest {
+
+ @Test
+ public void setAuditProxy() {
+ HashSet<String> ipList = new HashSet<>();
+ ipList.add("0.0.0.0:11222");
+ AuditImp.getInstance().setAuditProxy(ipList);
+ }
+
+ @Test
+ public void add() {
+ HashSet<String> ipList = new HashSet<>();
+ ipList.add("0.0.0.0:11222");
+ AuditImp.getInstance().setAuditProxy(ipList);
+ AuditImp.getInstance().add(1, "inlongGroupIDTest",
+ "inlongStreamIDTest", System.currentTimeMillis(), 1, 1);
+ }
+
+}
+
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderChannelTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderChannelTest.java
new file mode 100644
index 0000000..c883e6d
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderChannelTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.util.Encoder;
+import org.apache.inlong.audit.util.IpPort;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SenderChannelTest {
+ private static final Logger logger = LoggerFactory.getLogger(SenderChannelTest.class);
+ private ClientBootstrap client = new ClientBootstrap();
+ private IpPort ipPortObj = new IpPort("0.0.0.0", 54041);
+ private ChannelFuture future;
+ SenderChannel senderChannel;
+
+ /**
+ * test SenderChannel
+ */
+ public SenderChannelTest() {
+ try {
+ client.setFactory(new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool(),
+ 10));
+
+ client.setPipelineFactory(() -> {
+ ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("encoder", new Encoder());
+ return pipeline;
+ });
+ client.setOption("tcpNoDelay", true);
+ client.setOption("child.tcpNoDelay", true);
+ client.setOption("keepAlive", true);
+ client.setOption("child.keepAlive", true);
+ client.setOption("reuseAddr", true);
+
+ future = client.connect(ipPortObj.addr).await();
+ senderChannel = new SenderChannel(future.getChannel(), ipPortObj, 10);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ @Test
+ public void tryAcquire() {
+ boolean ret = senderChannel.tryAcquire();
+ assertTrue(ret);
+ }
+
+ @Test
+ public void release() {
+ senderChannel.release();
+ }
+
+ @Test
+ public void testToString() {
+ IpPort ipPort = senderChannel.getIpPort();
+ assertEquals(ipPort, ipPortObj);
+ }
+
+ @Test
+ public void getIpPort() {
+ String toString = senderChannel.toString();
+ assertEquals(toString, "0.0.0.0:54041");
+ }
+
+ @Test
+ public void getChannel() {
+ Channel channel = senderChannel.getChannel();
+ assertFalse(channel.getRemoteAddress().toString().equals("0.0.0.0:54041"));
+ }
+}
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
new file mode 100644
index 0000000..d69a26c
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+import org.apache.inlong.audit.util.AuditData;
+import org.apache.inlong.audit.util.Config;
+import org.apache.inlong.audit.util.Decoder;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.junit.Test;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SenderGroupTest {
+ Config testConfig = new Config();
+ SenderManager testManager = new SenderManager(testConfig);
+ SenderHandler clientHandler = new org.apache.inlong.audit.send.SenderHandler(testManager);
+ SenderGroup sender = new org.apache.inlong.audit.send.SenderGroup(10, new Decoder(), clientHandler);
+
+ @Test
+ public void send() {
+ AuditApi.AuditMessageHeader header = AuditApi.AuditMessageHeader.newBuilder().setIp("127.0.0.1").build();
+ AuditApi.AuditMessageBody body = AuditApi.AuditMessageBody.newBuilder().setAuditId("1").build();
+ AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(header)
+ .addMsgBody(body).build();
+ AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
+ AuditData testData = new AuditData(System.currentTimeMillis(), baseCommand);
+ ChannelBuffer dataBuf = ChannelBuffers.wrappedBuffer(testData.getDataByte());
+ sender.send(dataBuf);
+ }
+
+ @Test
+ public void release() {
+ sender.release("127.0.9.1:80");
+ }
+
+ @Test
+ public void updateConfig() {
+ Set<String> ipLists = new LinkedHashSet<>();
+ ipLists.add("127.0.9.1:80");
+ ipLists.add("127.0.9.1:81");
+ ipLists.add("127.0.9.1:82");
+ sender.updateConfig(ipLists);
+ }
+
+ @Test
+ public void isHasSendError() {
+ sender.setHasSendError(false);
+ boolean isError = sender.isHasSendError();
+ assertFalse(isError);
+ sender.setHasSendError(true);
+ isError = sender.isHasSendError();
+ assertTrue(isError);
+ }
+
+ @Test
+ public void setHasSendError() {
+ sender.setHasSendError(false);
+ boolean isError = sender.isHasSendError();
+ assertFalse(isError);
+ sender.setHasSendError(true);
+ isError = sender.isHasSendError();
+ assertTrue(isError);
+ }
+}
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
new file mode 100644
index 0000000..696bcc5
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+import org.apache.inlong.audit.util.Config;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+;
+
+public class SenderManagerTest {
+ private Config testConfig = new Config();
+
+ @Test
+ public void nextRequestId() {
+ SenderManager testManager = new SenderManager(testConfig);
+ Long requestId = testManager.nextRequestId();
+ System.out.println(requestId);
+ assertTrue(requestId == 0);
+
+ requestId = testManager.nextRequestId();
+ assertTrue(requestId == 1);
+
+ requestId = testManager.nextRequestId();
+ assertTrue(requestId == 2);
+ }
+
+ @Test
+ public void send() {
+ AuditApi.AuditMessageHeader header = AuditApi.AuditMessageHeader.newBuilder().setIp("127.0.0.1").build();
+ AuditApi.AuditMessageBody body = AuditApi.AuditMessageBody.newBuilder().setAuditId("1").build();
+ AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(header)
+ .addMsgBody(body).build();
+ AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
+ SenderManager testManager = new SenderManager(testConfig);
+ testManager.send(System.currentTimeMillis(), baseCommand);
+ }
+
+ @Test
+ public void clearBuffer() {
+ SenderManager testManager = new SenderManager(testConfig);
+ testManager.clearBuffer();
+ int dataMapSize = testManager.getDataMapSize();
+ assertTrue(dataMapSize == 0);
+ }
+}
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
new file mode 100644
index 0000000..1d4d688
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class AuditDataTest {
+ @Test
+ public void increaseResendTimes() {
+ AuditApi.BaseCommand content = null;
+ AuditData test = new AuditData(System.currentTimeMillis(), content);
+ int resendTimes = test.increaseResendTimes();
+ assertTrue(resendTimes == 1);
+ resendTimes = test.increaseResendTimes();
+ assertTrue(resendTimes == 2);
+ resendTimes = test.increaseResendTimes();
+ assertTrue(resendTimes == 3);
+ }
+
+ @Test
+ public void getDataByte() {
+ AuditApi.AuditMessageHeader header = AuditApi.AuditMessageHeader.newBuilder().setIp("127.0.0.1").build();
+ AuditApi.AuditMessageBody body = AuditApi.AuditMessageBody.newBuilder().setAuditId("1").build();
+ AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(header)
+ .addMsgBody(body).build();
+ AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
+ AuditData test = new AuditData(System.currentTimeMillis(), baseCommand);
+ byte[] data = test.getDataByte();
+ assertTrue(data.length > 0);
+ }
+
+ @Test
+ public void addBytes() {
+ }
+}
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/ConfigTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/ConfigTest.java
new file mode 100644
index 0000000..f2827d3
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/ConfigTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class ConfigTest {
+
+ @Test
+ public void getLocalIP() {
+ Config test = new Config();
+ test.init();
+ String ip = test.getLocalIP();
+ assertTrue(ip.length() >= 0);
+ }
+
+ @Test
+ public void getDockerId() {
+ Config test = new Config();
+ test.init();
+ String dockerId = test.getDockerId();
+ assertTrue(dockerId.length() >= 0);
+ }
+}
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java
new file mode 100644
index 0000000..cdb8fd4
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IpPortTest {
+ private IpPort test = new IpPort("127.0.0.1", 80);
+
+ @Test
+ public void getIpPortKey() {
+ String ipPortKey = test.getIpPortKey("127.0.0.1", 80);
+ assertTrue(ipPortKey.equals("127.0.0.1:80"));
+ }
+
+ @Test
+ public void testHashCode() {
+ int hashCode = test.hashCode();
+ assertTrue(hashCode != 0);
+ }
+
+ @Test
+ public void testEquals() {
+ IpPort test1 = new IpPort("127.0.0.1", 81);
+ boolean ret = test.equals(test1);
+ assertFalse(ret);
+
+ IpPort test2 = new IpPort("127.0.0.1", 80);
+ ret = test.toString().equals(test2.toString());
+ assertTrue(ret);
+
+ IpPort test3 = test;
+ ret = test.equals(test3);
+ assertTrue(ret);
+ }
+
+ @Test
+ public void parseIpPort() {
+ IpPort testIpPort = test.parseIpPort("127.0.0.1:83");
+ System.out.println(testIpPort);
+ }
+
+ @Test
+ public void testToString() {
+ String toSteing = test.toString();
+ System.out.println(toSteing);
+ }
+}
\ No newline at end of file
diff --git a/inlong-audit/pom.xml b/inlong-audit/pom.xml
index a1904b5..b5b9e9b 100644
--- a/inlong-audit/pom.xml
+++ b/inlong-audit/pom.xml
@@ -38,6 +38,7 @@
<module>audit-docker</module>
<module>audit-store</module>
<module>audit-common</module>
+ <module>audit-sdk</module>
</modules>
<properties>