You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/14 02:17:59 UTC

[incubator-inlong] branch master updated: [INLONG-1734][feature][audit] audit-sdk module (#1968)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dd796fc  [INLONG-1734][feature][audit] audit-sdk module (#1968)
dd796fc is described below

commit dd796fca09738972d56f9ae41a7b6f9499037ec9
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Tue Dec 14 10:17:50 2021 +0800

    [INLONG-1734][feature][audit] audit-sdk module (#1968)
---
 inlong-audit/audit-sdk/README.md                   |  31 +++
 inlong-audit/audit-sdk/pom.xml                     |  73 ++++++
 .../java/org/apache/inlong/audit/AuditImp.java     | 250 ++++++++++++++++++
 .../apache/inlong/audit/send/SenderChannel.java    | 103 ++++++++
 .../org/apache/inlong/audit/send/SenderGroup.java  | 283 +++++++++++++++++++++
 .../apache/inlong/audit/send/SenderHandler.java    |  88 +++++++
 .../apache/inlong/audit/send/SenderManager.java    | 224 ++++++++++++++++
 .../org/apache/inlong/audit/util/AuditData.java    |  72 ++++++
 .../java/org/apache/inlong/audit/util/Config.java  | 109 ++++++++
 .../java/org/apache/inlong/audit/util/Decoder.java |  56 ++++
 .../java/org/apache/inlong/audit/util/Encoder.java |  39 +++
 .../java/org/apache/inlong/audit/util/IpPort.java  | 148 +++++++++++
 .../org/apache/inlong/audit/util/SenderResult.java |  47 ++++
 .../org/apache/inlong/audit/util/StatInfo.java     |  32 +++
 .../java/org/apache/inlong/audit/AuditImpTest.java |  43 ++++
 .../inlong/audit/send/SenderChannelTest.java       | 101 ++++++++
 .../apache/inlong/audit/send/SenderGroupTest.java  |  85 +++++++
 .../inlong/audit/send/SenderManagerTest.java       |  63 +++++
 .../apache/inlong/audit/util/AuditDataTest.java    |  53 ++++
 .../org/apache/inlong/audit/util/ConfigTest.java   |  41 +++
 .../org/apache/inlong/audit/util/IpPortTest.java   |  66 +++++
 inlong-audit/pom.xml                               |   1 +
 22 files changed, 2008 insertions(+)

diff --git a/inlong-audit/audit-sdk/README.md b/inlong-audit/audit-sdk/README.md
new file mode 100644
index 0000000..57ab063
--- /dev/null
+++ b/inlong-audit/audit-sdk/README.md
@@ -0,0 +1,31 @@
+ # Description
+## overview
+The audit sdk is used to count the receiving and sending volume of each module in real time according to the cycle, 
+and the statistical results are sent to the audit access layer according to the cycle.
+
+## features
+### data uniqueness
+The audit sdk will add a unique mark to each audit audit, which can be used to remove duplicates.
+
+### unified audit standard
+The audit sdk uses log production time as the audit standard, 
+which can ensure that each module is reconciled in accordance with the unified audit standard.
+
+## usage
+### setAuditProxy
+Set the audit access layer ip:port list. The audit sdk will summarize the results according to the cycle 
+and send them to the ip:port list set by the interface.
+If the ip:port of the audit access layer is fixed, then this interface needs to be called once. 
+If the audit access changes in real time, then the business program needs to call this interface periodically to update
+```java
+    HashSet<String> ipPortList=new HashSet<>();
+    ipPortList.add("0.0.0.0:54041");
+    AuditImp.getInstance().setAuditProxy(ipPortList);
+```
+
+### add
+Call the add method for statistics, where the auditID parameter uniquely identifies an audit object,
+inlongGroupID,inlongStreamID,logTime are audit dimensions, count is the number of items, size is the size, and logTime is milliseconds.
+```java
+    AuditImp.getInstance().add(1, "inlongGroupIDTest","inlongStreamIDTest", System.currentTimeMillis(), 1, 1);
+```
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/pom.xml b/inlong-audit/audit-sdk/pom.xml
new file mode 100644
index 0000000..2657bd6
--- /dev/null
+++ b/inlong-audit/audit-sdk/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>inlong-audit</artifactId>
+        <version>0.12.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>audit-sdk</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Audit Sdk</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <compiler.source>1.8</compiler.source>
+        <compiler.target>1.8</compiler.target>
+        <netty.version>3.8.0.Final</netty.version>
+        <junit.version>4.4</junit.version>
+        <protobuf.version>3.19.1</protobuf.version>
+        <commons.version>3.0</commons.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-common</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java
new file mode 100644
index 0000000..872cc78
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+import org.apache.inlong.audit.send.SenderManager;
+import org.apache.inlong.audit.util.Config;
+import org.apache.inlong.audit.util.StatInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDITREQUEST;
+
+public class AuditImp {
+    private static final Logger logger = LoggerFactory.getLogger(AuditImp.class);
+    private static AuditImp auditImp = new AuditImp();
+    private static final String FIELD_SEPARATORS = ":";
+    private ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<String, StatInfo>();
+    private HashMap<String, StatInfo> threadSumMap = new HashMap<String, StatInfo>();
+    private ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<String, StatInfo>();
+    private List<String> deleteKeyList = new ArrayList<String>();
+    private Config config = new Config();
+    private Long sdkTime;
+    private int packageId = 1;
+    private int dataId = 0;
+    private static final int BATCH_NUM = 100;
+    boolean inited = false;
+    private SenderManager manager;
+    private static ReentrantLock globalLock = new ReentrantLock();
+    private static int PERIOD = 1000 * 60;
+    private Timer timer = new Timer();
+    private TimerTask timerTask = new TimerTask() {
+        @Override
+        public void run() {
+            try {
+                sendReport();
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+            }
+        }
+    };
+
+    public static AuditImp getInstance() {
+        return auditImp;
+    }
+
+    /**
+     * init
+     */
+    private void init() {
+        if (inited) {
+            return;
+        }
+        config.init();
+        timer.schedule(timerTask, PERIOD, PERIOD);
+        this.manager = new SenderManager(config);
+    }
+
+    /**
+     * setAuditProxy
+     *
+     * @param ipPortList
+     */
+    public void setAuditProxy(HashSet<String> ipPortList) {
+        try {
+            globalLock.lockInterruptibly();
+            if (!inited) {
+                init();
+                inited = true;
+            }
+            this.manager.setAuditProxy(ipPortList);
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        } finally {
+            globalLock.unlock();
+        }
+    }
+
+    /**
+     * api
+     *
+     * @param auditID
+     * @param inlongGroupID
+     * @param inlongStreamID
+     * @param logTime
+     * @param count
+     * @param size
+     */
+    public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size) {
+        long delayTime = System.currentTimeMillis() - logTime;
+        String key = (logTime / PERIOD) + FIELD_SEPARATORS + inlongGroupID + FIELD_SEPARATORS
+                + inlongStreamID + FIELD_SEPARATORS + auditID;
+        addByKey(key, count, size, delayTime);
+    }
+
+    /**
+     * add by key
+     *
+     * @param key
+     * @param count
+     * @param size
+     * @param delayTime
+     */
+    private void addByKey(String key, long count, long size, long delayTime) {
+        try {
+            if (countMap.get(key) == null) {
+                countMap.put(key, new StatInfo(0L, 0L, 0L));
+            }
+            countMap.get(key).count.addAndGet(count);
+            countMap.get(key).size.addAndGet(size);
+            countMap.get(key).delay.addAndGet(delayTime * count);
+        } catch (Exception e) {
+            return;
+        }
+    }
+
+    /**
+     * Report audit data
+     */
+    private synchronized void sendReport() {
+        manager.clearBuffer();
+        resetStat();
+        // Retrieve statistics from the list of objects without statistics to be eliminated
+        for (Map.Entry<String, StatInfo> entry : this.deleteCountMap.entrySet()) {
+            this.sumThreadGroup(entry.getKey(), entry.getValue());
+        }
+        this.deleteCountMap.clear();
+        for (Map.Entry<String, StatInfo> entry : countMap.entrySet()) {
+            String key = entry.getKey();
+            StatInfo value = entry.getValue();
+            // If there is no data, enter the list to be eliminated
+            if (value.count.get() == 0) {
+                this.deleteKeyList.add(key);
+                continue;
+            }
+            this.sumThreadGroup(key, value);
+        }
+
+        // Clean up obsolete statistical data objects
+        for (String key : this.deleteKeyList) {
+            StatInfo value = this.countMap.remove(key);
+            this.deleteCountMap.put(key, value);
+        }
+        this.deleteKeyList.clear();
+        sdkTime = Calendar.getInstance().getTimeInMillis();
+        AuditApi.AuditMessageHeader mssageHeader = AuditApi.AuditMessageHeader.newBuilder()
+                .setIp(config.getLocalIP()).setDockerId(config.getDockerId())
+                .setThreadId(String.valueOf(Thread.currentThread().getId()))
+                .setSdkTs(sdkTime).setPacketId(packageId)
+                .build();
+        AuditApi.AuditRequest.Builder requestBulid = AuditApi.AuditRequest.newBuilder();
+        requestBulid.setMsgHeader(mssageHeader).setRequestId(manager.nextRequestId());
+        for (Map.Entry<String, StatInfo> entry : threadSumMap.entrySet()) {
+            String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
+            long logTime = Long.parseLong(keyArray[0]) * 60;
+            String inlongGroupID = keyArray[1];
+            String inlongStreamID = keyArray[2];
+            String auditID = keyArray[3];
+            StatInfo value = entry.getValue();
+            AuditApi.AuditMessageBody mssageBody = AuditApi.AuditMessageBody.newBuilder()
+                    .setLogTs(logTime).setInlongGroupId(inlongGroupID)
+                    .setInlongStreamId(inlongStreamID).setAuditId(auditID)
+                    .setCount(value.count.get()).setSize(value.size.get())
+                    .setDelay(value.delay.get())
+                    .build();
+            requestBulid.addMsgBody(mssageBody);
+            if (dataId++ >= BATCH_NUM) {
+                dataId = 0;
+                packageId++;
+                sendByBaseCommand(sdkTime, requestBulid.build());
+                requestBulid.clearMsgBody();
+            }
+        }
+        if (requestBulid.getMsgBodyCount() > 0) {
+            sendByBaseCommand(sdkTime, requestBulid.build());
+            requestBulid.clearMsgBody();
+        }
+        threadSumMap.clear();
+        logger.info("finished send report.");
+    }
+
+    /**
+     * send base command
+     *
+     * @param sdkTime
+     * @param auditRequest
+     */
+    private void sendByBaseCommand(long sdkTime, AuditApi.AuditRequest auditRequest) {
+        AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder();
+        baseCommand.setType(AUDITREQUEST).setAuditRequest(auditRequest).build();
+        manager.send(sdkTime, baseCommand.build());
+    }
+
+    /**
+     * Summary
+     *
+     * @param key
+     * @param statInfo
+     */
+    private void sumThreadGroup(String key, StatInfo statInfo) {
+        long count = statInfo.count.getAndSet(0);
+        if (0 == count) {
+            return;
+        }
+        if (threadSumMap.get(key) == null) {
+            threadSumMap.put(key, new StatInfo(0, 0, 0));
+        }
+
+        long size = statInfo.size.getAndSet(0);
+        long delay = statInfo.delay.getAndSet(0);
+
+        threadSumMap.get(key).count.addAndGet(count);
+        threadSumMap.get(key).size.addAndGet(size);
+        threadSumMap.get(key).delay.addAndGet(delay);
+    }
+
+    /**
+     * Reset statistics
+     */
+    private void resetStat() {
+        dataId = 0;
+        packageId = 1;
+    }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
new file mode 100644
index 0000000..1ecbd4f
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.util.IpPort;
+import org.jboss.netty.channel.Channel;
+
+import java.util.concurrent.Semaphore;
+
+public class SenderChannel {
+
+    private IpPort ipPort;
+    private Channel channel;
+    private Semaphore packToken;
+
+    /**
+     * Constructor
+     *
+     * @param channel
+     * @param ipPort
+     */
+    public SenderChannel(Channel channel, IpPort ipPort, int maxSynchRequest) {
+        this.channel = channel;
+        this.ipPort = ipPort;
+        this.packToken = new Semaphore(maxSynchRequest);
+        this.channel.getConfig().setConnectTimeoutMillis(5000);
+    }
+
+    /**
+     * Try acquire channel
+     *
+     * @return
+     */
+    public boolean tryAcquire() {
+        return packToken.tryAcquire();
+    }
+
+    /**
+     * release channel
+     */
+    public void release() {
+        packToken.release();
+    }
+
+    /**
+     * toString
+     */
+    @Override
+    public String toString() {
+        return ipPort.key;
+    }
+
+    /**
+     * get ipPort
+     *
+     * @return the ipPort
+     */
+    public IpPort getIpPort() {
+        return ipPort;
+    }
+
+    /**
+     * set ipPort
+     *
+     * @param ipPort the ipPort to set
+     */
+    public void setIpPort(IpPort ipPort) {
+        this.ipPort = ipPort;
+    }
+
+    /**
+     * get channel
+     *
+     * @return the channel
+     */
+    public Channel getChannel() {
+        return channel;
+    }
+
+    /**
+     * set channel
+     *
+     * @param channel the channel to set
+     */
+    public void setChannel(Channel channel) {
+        this.channel = channel;
+    }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
new file mode 100644
index 0000000..fab58e8
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.util.Encoder;
+import org.apache.inlong.audit.util.IpPort;
+import org.apache.inlong.audit.util.SenderResult;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class SenderGroup {
+    private static final Logger logger = LoggerFactory.getLogger(SenderGroup.class);
+    // maximum number of sending
+    public static final int MAX_SEND_TIMES = 3;
+    public static final int DEFAULT_WAIT_TIMES = 10000;
+    public static final int WAIT_INTERVAL = 1;
+    public static final int DEFAULT_SYNCH_REQUESTS = 1;
+
+    private ClientBootstrap client = new ClientBootstrap();
+    private List<LinkedBlockingQueue<SenderChannel>> channelGroups = new ArrayList<>();
+    private int mIndex = 0;
+    private List<SenderChannel> deleteChannels = new ArrayList<>();
+    private ConcurrentHashMap<String, SenderChannel> totalChannels = new ConcurrentHashMap<>();
+
+    private int senderThreadNum;
+    private int waitChannelTimes = DEFAULT_WAIT_TIMES;
+    private int waitChannelIntervalMs = WAIT_INTERVAL;
+    private int maxSynchRequest = DEFAULT_SYNCH_REQUESTS;
+    private boolean hasSendError = false;
+
+    /**
+     * constructor
+     *
+     * @param senderThreadNum
+     * @param decoder
+     * @param clientHandler
+     */
+    public SenderGroup(int senderThreadNum, ChannelUpstreamHandler decoder,
+                       SimpleChannelHandler clientHandler) {
+        this.senderThreadNum = senderThreadNum;
+
+        client.setFactory(new NioClientSocketChannelFactory(
+                Executors.newCachedThreadPool(),
+                Executors.newCachedThreadPool(),
+                this.senderThreadNum));
+
+        client.setPipelineFactory(() -> {
+            ChannelPipeline pipeline = Channels.pipeline();
+            pipeline.addLast("decoder", decoder);
+            pipeline.addLast("encoder", new Encoder());
+            pipeline.addLast("handler", clientHandler);
+            return pipeline;
+        });
+        client.setOption("tcpNoDelay", true);
+        client.setOption("child.tcpNoDelay", true);
+        client.setOption("keepAlive", true);
+        client.setOption("child.keepAlive", true);
+        client.setOption("reuseAddr", true);
+
+        channelGroups.add(new LinkedBlockingQueue<>());
+        channelGroups.add(new LinkedBlockingQueue<>());
+    }
+
+    /**
+     * send data
+     *
+     * @param dataBuf
+     * @return
+     */
+    public SenderResult send(ChannelBuffer dataBuf) {
+        LinkedBlockingQueue<SenderChannel> channels = channelGroups.get(mIndex);
+        SenderChannel channel = null;
+        try {
+            if (channels.size() <= 0) {
+                logger.error("channels is empty");
+                return new SenderResult("channels is empty", 0, false);
+            }
+            boolean isOk = false;
+            for (int tryIndex = 0; tryIndex < waitChannelTimes; tryIndex++) {
+                channels = channelGroups.get(mIndex);
+                for (int i = 0; i < channels.size(); i++) {
+                    channel = channels.poll();
+                    boolean ret = channel.tryAcquire();
+                    if (channel.tryAcquire()) {
+                        isOk = true;
+                        break;
+                    }
+
+                    if (ret) {
+                        isOk = true;
+                        break;
+                    }
+                    channels.offer(channel);
+                    channel = null;
+                }
+                if (isOk) {
+                    break;
+                }
+                try {
+                    Thread.sleep(waitChannelIntervalMs);
+                } catch (Throwable e) {
+                    System.out.println(e.getMessage());
+                }
+            }
+            if (channel == null) {
+                logger.error("can not get a channel");
+                return new SenderResult("can not get a channel", 0, false);
+            }
+            ChannelFuture t = null;
+            if (channel.getChannel().isConnected()) {
+                t = channel.getChannel().write(dataBuf).sync().await();
+                if (!t.isSuccess()) {
+                    if (!channel.getChannel().isConnected()) {
+                        reconnect(channel);
+                    }
+                    t = channel.getChannel().write(dataBuf).sync().await();
+                }
+            } else {
+                reconnect(channel);
+                t = channel.getChannel().write(dataBuf).sync().await();
+            }
+            return new SenderResult(channel.getIpPort().ip, channel.getIpPort().port, t.isSuccess());
+        } catch (Throwable ex) {
+            logger.error(ex.getMessage());
+            this.setHasSendError(true);
+            return new SenderResult(ex.getMessage(), 0, false);
+        } finally {
+            if (channel != null) {
+                channel.release();
+                channels.offer(channel);
+            }
+        }
+    }
+
+    /**
+     * release channel
+     */
+    public void release(String ipPort) {
+        SenderChannel channel = this.totalChannels.get(ipPort);
+        if (channel != null) {
+            channel.release();
+        }
+    }
+
+    /**
+     * release channel
+     */
+    public void release(InetSocketAddress addr) {
+        String destIp = addr.getHostName();
+        int destPort = addr.getPort();
+        String ipPort = IpPort.getIpPortKey(destIp, destPort);
+        SenderChannel channel = this.totalChannels.get(ipPort);
+        if (channel != null) {
+            channel.release();
+        }
+    }
+
+    /**
+     * update config
+     *
+     * @param ipLists
+     */
+    public void updateConfig(Set<String> ipLists) {
+        try {
+            for (SenderChannel dc : deleteChannels) {
+                dc.getChannel().disconnect();
+                dc.getChannel().close();
+            }
+            deleteChannels.clear();
+            int newIndex = mIndex ^ 0x01;
+            LinkedBlockingQueue<SenderChannel> newChannels = this.channelGroups.get(newIndex);
+            newChannels.clear();
+            for (String ipPort : ipLists) {
+                SenderChannel channel = totalChannels.get(ipPort);
+                if (channel != null) {
+                    newChannels.add(channel);
+                    continue;
+                }
+                try {
+                    IpPort ipPortObj = IpPort.parseIpPort(ipPort);
+                    if (ipPortObj == null) {
+                        continue;
+                    }
+                    ChannelFuture future = client.connect(ipPortObj.addr).await();
+                    channel = new SenderChannel(future.getChannel(), ipPortObj, maxSynchRequest);
+                    newChannels.add(channel);
+                    totalChannels.put(ipPort, channel);
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                }
+            }
+
+            for (Entry<String, SenderChannel> entry : totalChannels.entrySet()) {
+                if (!ipLists.contains(entry.getKey())) {
+                    deleteChannels.add(entry.getValue());
+                }
+            }
+            for (SenderChannel dc : deleteChannels) {
+                totalChannels.remove(dc.getIpPort().key);
+            }
+            this.mIndex = newIndex;
+        } catch (Throwable e) {
+            logger.error("Update Sender Ip Failed." + e.getMessage());
+        }
+    }
+
+    /**
+     * reconnect
+     *
+     * @param channel
+     */
+    private void reconnect(SenderChannel channel) {
+        try {
+            synchronized (channel) {
+                if (channel.getChannel().isOpen()) {
+                    return;
+                }
+
+                Channel oldChannel = channel.getChannel();
+                ChannelFuture future = client.connect(channel.getIpPort().addr).await();
+                Channel newChannel = future.getChannel();
+                channel.setChannel(newChannel);
+                oldChannel.disconnect();
+                oldChannel.close();
+            }
+        } catch (Throwable e) {
+            logger.error("reconnect failed." + e.getMessage());
+        }
+    }
+
+    /**
+     * get hasSendError
+     *
+     * @return the hasSendError
+     */
+    public boolean isHasSendError() {
+        return hasSendError;
+    }
+
+    /**
+     * set hasSendError
+     *
+     * @param hasSendError the hasSendError to set
+     */
+    public void setHasSendError(boolean hasSendError) {
+        this.hasSendError = hasSendError;
+    }
+
+}
+
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
new file mode 100644
index 0000000..cf43c18
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SenderHandler extends SimpleChannelHandler {
+    private static final Logger logger = LoggerFactory.getLogger(SenderHandler.class);
+    private SenderManager manager;
+
+    /**
+     * Constructor
+     *
+     * @param manager
+     */
+    public SenderHandler(SenderManager manager) {
+        this.manager = manager;
+    }
+
+    /**
+     * Message Received
+     */
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+        try {
+            manager.onMessageReceived(ctx, e);
+        } catch (Throwable ex) {
+            logger.error(ex.getMessage());
+        }
+    }
+
+    /**
+     * Caught exception
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        try {
+            manager.onExceptionCaught(ctx, e);
+        } catch (Throwable ex) {
+            logger.error(ex.getMessage());
+        }
+    }
+
+    /**
+     * Disconnected channel
+     */
+    @Override
+    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+        try {
+            super.channelDisconnected(ctx, e);
+        } catch (Throwable ex) {
+            logger.error(ex.getMessage());
+        }
+    }
+
+    /**
+     * Closed channel
+     */
+    @Override
+    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
+        try {
+            super.channelClosed(ctx, e);
+        } catch (Throwable ex) {
+            logger.error(ex.getMessage());
+        }
+    }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
new file mode 100644
index 0000000..6959341
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+import org.apache.inlong.audit.util.AuditData;
+import org.apache.inlong.audit.util.Config;
+import org.apache.inlong.audit.util.Decoder;
+import org.apache.inlong.audit.util.SenderResult;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * sender manager
+ */
+public class SenderManager {
+    private static final Logger logger = LoggerFactory.getLogger(SenderManager.class);
+    public static final int DEFAULT_SEND_THREADNUM = 2;
+    public static final Long MAX_REQUEST_ID = 1000000000L;
+    public static final int ALL_CONNECT_CHANNEL = -1;
+    public static final int DEFAULT_CONNECT_CHANNEL = 2;
+
+    private SenderGroup sender;
+    private int maxConnectChannels = ALL_CONNECT_CHANNEL;
+    private SecureRandom sRandom = new SecureRandom(Long.toString(System.currentTimeMillis()).getBytes());
+    // IPList
+    private HashSet<String> currentIpPorts = new HashSet<String>();
+    private AtomicLong requestIdSeq = new AtomicLong(0L);
+    private ConcurrentHashMap<Long, AuditData> dataMap = new ConcurrentHashMap<>();
+    private Config config;
+
+    /**
+     * Constructor
+     *
+     * @param config
+     */
+    public SenderManager(Config config) {
+        this(config, DEFAULT_CONNECT_CHANNEL);
+    }
+
+    /**
+     * Constructor
+     *
+     * @param config
+     * @param maxConnectChannels
+     */
+    public SenderManager(Config config, int maxConnectChannels) {
+        try {
+            this.config = config;
+            this.maxConnectChannels = maxConnectChannels;
+            SenderHandler clientHandler = new SenderHandler(this);
+            this.sender = new SenderGroup(DEFAULT_SEND_THREADNUM, new Decoder(), clientHandler);
+        } catch (Exception ex) {
+            logger.error(ex.getMessage());
+        }
+    }
+
+    /**
+     * update config
+     */
+    public void setAuditProxy(HashSet<String> ipPortList) {
+        if (ipPortList.equals(currentIpPorts) && !this.sender.isHasSendError()) {
+            return;
+        }
+        this.sender.setHasSendError(false);
+        this.currentIpPorts = ipPortList;
+        int ipSize = ipPortList.size();
+        int needNewSize = 0;
+        if (this.maxConnectChannels == ALL_CONNECT_CHANNEL || this.maxConnectChannels >= ipSize) {
+            needNewSize = ipSize;
+        } else {
+            needNewSize = maxConnectChannels;
+        }
+        HashSet<String> updateConfigIpLists = new HashSet<>();
+        List<String> availableIpLists = new ArrayList<String>();
+        availableIpLists.addAll(ipPortList);
+        for (int i = 0; i < needNewSize; i++) {
+            int availableIpSize = availableIpLists.size();
+            int newIpPortIndex = this.sRandom.nextInt(availableIpSize);
+            String ipPort = availableIpLists.remove(newIpPortIndex);
+            updateConfigIpLists.add(ipPort);
+        }
+        if (updateConfigIpLists.size() > 0) {
+            this.sender.updateConfig(updateConfigIpLists);
+        }
+    }
+
+    /**
+     * next requestid
+     *
+     * @return
+     */
+    public Long nextRequestId() {
+        Long requestId = requestIdSeq.getAndIncrement();
+        if (requestId > MAX_REQUEST_ID) {
+            requestId = 0L;
+            requestIdSeq.set(requestId);
+        }
+        return requestId;
+    }
+
+    /**
+     * send data
+     *
+     * @param sdkTime
+     * @param baseCommand
+     */
+    public void send(long sdkTime, AuditApi.BaseCommand baseCommand) {
+        AuditData data = new AuditData(sdkTime, baseCommand);
+        // Cache first
+        this.dataMap.putIfAbsent(baseCommand.getAuditRequest().getRequestId(), data);
+        this.sendData(data);
+    }
+
+    /**
+     * send data
+     *
+     * @param data
+     */
+    private void sendData(AuditData data) {
+        ChannelBuffer dataBuf = ChannelBuffers.wrappedBuffer(data.getDataByte());
+        SenderResult result = this.sender.send(dataBuf);
+        if (!result.result) {
+            this.sender.setHasSendError(true);
+        }
+    }
+
+    /**
+     * Clean up the backlog of unsent message packets
+     */
+    public void clearBuffer() {
+        for (AuditData data : this.dataMap.values()) {
+            this.sendData(data);
+        }
+    }
+
+    /**
+     * get data map szie
+     */
+    public int getDataMapSize() {
+        return this.dataMap.size();
+    }
+
+    /**
+     * processing return package
+     *
+     * @param ctx
+     * @param e
+     */
+    public void onMessageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+        try {
+            //Analyze abnormal events
+            if (!(e.getMessage() instanceof ChannelBuffer)) {
+                logger.error("onMessageReceived e.getMessage:" + e.getMessage());
+                return;
+            }
+            ChannelBuffer readBuffer = (ChannelBuffer) e.getMessage();
+            byte[] readBytes = readBuffer.toByteBuffer().array();
+            AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.parseFrom(readBytes);
+            // Parse request id
+            Long requestId = baseCommand.getAuditReply().getRequestId();
+            AuditData data = this.dataMap.get(requestId);
+            if (data == null) {
+                logger.error("can not find the requestid onMessageReceived:" + requestId);
+                return;
+            }
+            if (AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(baseCommand.getAuditReply().getRspCode())) {
+                this.dataMap.remove(requestId);
+                this.sender.notifyAll();
+                return;
+            }
+            int resendTimes = data.increaseResendTimes();
+            if (resendTimes < org.apache.inlong.audit.send.SenderGroup.MAX_SEND_TIMES) {
+                this.sendData(data);
+            }
+            this.sender.notifyAll();
+        } catch (Throwable ex) {
+            logger.error(ex.getMessage());
+            this.sender.setHasSendError(true);
+        }
+    }
+
+    /**
+     * Handle the packet return exception
+     *
+     * @param ctx
+     * @param e
+     */
+    public void onExceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        logger.error(e.getCause().getMessage());
+        try {
+            this.sender.setHasSendError(true);
+        } catch (Throwable ex) {
+            logger.error(ex.getMessage());
+        }
+    }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
new file mode 100644
index 0000000..03c101e
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AuditData {
+    public static int HEAD_LENGTH = 4;
+    private final long sdkTime;
+    private final AuditApi.BaseCommand content;
+    private AtomicInteger resendTimes = new AtomicInteger(0);
+
+    /**
+     * Constructor
+     *
+     * @param sdkTime
+     * @param content
+     */
+    public AuditData(long sdkTime, AuditApi.BaseCommand content) {
+        this.sdkTime = sdkTime;
+        this.content = content;
+    }
+
+    /**
+     * set resendTimes
+     */
+    public int increaseResendTimes() {
+        return this.resendTimes.incrementAndGet();
+    }
+
+    /**
+     * getDataByte
+     *
+     * @return
+     */
+    public byte[] getDataByte() {
+        return addBytes(ByteBuffer.allocate(HEAD_LENGTH).putInt(content.toByteArray().length).array(),
+                content.toByteArray());
+    }
+
+    /**
+     * Concatenated byte array
+     *
+     * @param data1
+     * @param data2
+     * @return data1 and  data2 combined package result
+     */
+    public byte[] addBytes(byte[] data1, byte[] data2) {
+        byte[] data3 = new byte[data1.length + data2.length];
+        System.arraycopy(data1, 0, data3, 0, data1.length);
+        System.arraycopy(data2, 0, data3, data1.length, data2.length);
+        return data3;
+    }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
new file mode 100644
index 0000000..f7ce314
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+public class Config {
+    private static final Logger logger = LoggerFactory.getLogger(Config.class);
+    private String localIP = "";
+    private String dockerId = "";
+
+    public void init() {
+        initIP();
+        initDockerId();
+    }
+
+    public String getLocalIP() {
+        return localIP;
+    }
+
+    public String getDockerId() {
+        return dockerId;
+    }
+
+    private void initIP() {
+        try {
+            for (Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) {
+                NetworkInterface intf = en.nextElement();
+                String name = intf.getName();
+                if (!name.contains("docker") && !name.contains("lo")) {
+                    for (Enumeration<InetAddress> enumIpAddr = intf.getInetAddresses();
+                         enumIpAddr.hasMoreElements(); ) {
+                        InetAddress inetAddress = enumIpAddr.nextElement();
+                        if (!inetAddress.isLoopbackAddress()) {
+                            String ipaddress = inetAddress.getHostAddress();
+                            if (!ipaddress.contains("::") && !ipaddress.contains("0:0:")
+                                    && !ipaddress.contains("fe80")) {
+                                localIP = ipaddress;
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (SocketException ex) {
+            localIP = "127.0.0.1";
+            return;
+        }
+    }
+
+    private void initDockerId() {
+        BufferedReader in = null;
+        try {
+            File file = new File("/proc/self/cgroup");
+            if (file.exists() == false) {
+                return;
+            }
+            in = new BufferedReader(new FileReader("/proc/self/cgroup"));
+            String dockerID = in.readLine();
+            if (dockerID.equals("") == false) {
+                int n = dockerID.indexOf("/");
+                String dockerID2 = dockerID.substring(n + 1, (dockerID.length() - n - 1));
+                n = dockerID2.indexOf("/");
+                dockerId = dockerID2.substring(n + 1, 12);
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+            return;
+        } catch (NullPointerException e2) {
+            logger.error(e2.getMessage());
+            return;
+        } catch (Exception e3) {
+            logger.error(e3.getMessage());
+            return;
+        } finally {
+            if (in != null) {
+                try {
+                    in.close();
+                } catch (IOException e4) {
+                    logger.error(e4.getMessage());
+                }
+            }
+        }
+    }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java
new file mode 100644
index 0000000..01ea67b
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class Decoder extends FrameDecoder {
+    // Maximum return packet size
+    private static final int MAX_RESPONSE_LENGTH = 8 * 1024 * 1024;
+
+    /**
+     * decoding
+     */
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
+        // Every time you need to read the complete package (that is, read to the end of the package),
+        // otherwise only the first one will be parsed correctly,
+        // which will adversely affect the parsing of the subsequent package
+        buffer.markReaderIndex();
+        //Packet composition: 4 bytes length content + ProtocolBuffer content
+        int totalLen = buffer.readInt();
+        // Respond to abnormal channel, interrupt in time to avoid stuck
+        if (totalLen > MAX_RESPONSE_LENGTH) {
+            channel.close();
+            return null;
+        }
+        // If the package is not complete, continue to wait for the return package
+        if (buffer.readableBytes() < totalLen) {
+            buffer.resetReaderIndex();
+            return null;
+        }
+        ChannelBuffer returnBuffer = new DynamicChannelBuffer(ChannelBuffers.BIG_ENDIAN, totalLen);
+        buffer.readBytes(returnBuffer, 0, totalLen);
+        return returnBuffer;
+    }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Encoder.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Encoder.java
new file mode 100644
index 0000000..abb3270
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Encoder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+public class Encoder extends OneToOneEncoder {
+
+    /**
+     * Encoder
+     */
+    @Override
+    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) {
+        if (!(msg instanceof ChannelBuffer)) {
+            return msg;
+        } else {
+            ChannelBuffer src = (ChannelBuffer) msg;
+            return src;
+        }
+    }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java
new file mode 100644
index 0000000..fa4ba5e
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/IpPort.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.apache.commons.lang3.math.NumberUtils;
+import org.jboss.netty.channel.Channel;
+
+import java.net.InetSocketAddress;
+
+public class IpPort {
+    public static final String SEPARATOR = ":";
+    public final String ip;
+    public final int port;
+    public final String key;
+    public final InetSocketAddress addr;
+
+    /**
+     * Constructor
+     *
+     * @param ip
+     * @param port
+     */
+    public IpPort(String ip, int port) {
+        this.ip = ip;
+        this.port = port;
+        this.key = getIpPortKey(ip, port);
+        this.addr = new InetSocketAddress(ip, port);
+    }
+
+    /**
+     * Constructor
+     *
+     * @param addr
+     */
+    public IpPort(InetSocketAddress addr) {
+        this.ip = addr.getHostName();
+        this.port = addr.getPort();
+        this.key = getIpPortKey(ip, port);
+        this.addr = addr;
+    }
+
+    /**
+     * get IpPort by key
+     *
+     * @param ip
+     * @param port
+     * @return
+     */
+    public static String getIpPortKey(String ip, int port) {
+        return ip + ":" + port;
+    }
+
+    /**
+     * parse sIpPort
+     *
+     * @param ipPort
+     * @return
+     */
+    public static IpPort parseIpPort(String ipPort) {
+        String[] splits = ipPort.split(SEPARATOR);
+        if (splits.length == 2) {
+            String strIp = splits[0];
+            String strPort = splits[1];
+            int port = NumberUtils.toInt(strPort, 0);
+            if (port > 0) {
+                return new IpPort(strIp, port);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * parse InetSocketAddress
+     *
+     * @param channel
+     * @return
+     */
+    public static InetSocketAddress parseInetSocketAddress(Channel channel) {
+        InetSocketAddress destAddr = null;
+        if (channel.getRemoteAddress() instanceof InetSocketAddress) {
+            destAddr = (InetSocketAddress) channel.getRemoteAddress();
+        } else {
+            String sendIp = channel.getRemoteAddress().toString();
+            destAddr = new InetSocketAddress(sendIp, 0);
+        }
+        return destAddr;
+    }
+
+    /**
+     * hashCode
+     */
+    @Override
+    public int hashCode() {
+        int result = ip.hashCode();
+        result = 31 * result + port;
+        return result;
+    }
+
+    /**
+     * equals
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        if (!(o instanceof IpPort)) {
+            return false;
+        }
+
+        try {
+            IpPort ctp = (IpPort) o;
+            if (ip != null && ip.equals(ctp.port) && port == ctp.port) {
+                return true;
+            }
+        } catch (Exception e) {
+            return false;
+        }
+        return false;
+    }
+
+    /**
+     * toString
+     */
+    public String toString() {
+        return key;
+    }
+}
+
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
new file mode 100644
index 0000000..69dcaf8
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+public class SenderResult {
+
+    public final IpPort ipPort;
+    public boolean result;
+
+    /**
+     * Constructor
+     *
+     * @param ipPort
+     * @param result
+     */
+    public SenderResult(IpPort ipPort, boolean result) {
+        this.ipPort = ipPort;
+        this.result = result;
+    }
+
+    /**
+     * Constructor
+     *
+     * @param sendIp
+     * @param sendPort
+     * @param result
+     */
+    public SenderResult(String sendIp, int sendPort, boolean result) {
+        this.ipPort = new IpPort(sendIp, sendPort);
+        this.result = result;
+    }
+}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/StatInfo.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/StatInfo.java
new file mode 100644
index 0000000..aecc22c
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/StatInfo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StatInfo {
+    public AtomicLong count = new AtomicLong(0);
+    public AtomicLong size = new AtomicLong(0);
+    public AtomicLong delay = new AtomicLong(0);
+
+    public StatInfo(long cnt, long sz, long dy) {
+        count.set(cnt);
+        size.set(sz);
+        delay.set(dy);
+    }
+}
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditImpTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditImpTest.java
new file mode 100644
index 0000000..9d07583
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/AuditImpTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.junit.Test;
+
+import java.util.HashSet;
+
+public class AuditImpTest {
+
+    @Test
+    public void setAuditProxy() {
+        HashSet<String> ipList = new HashSet<>();
+        ipList.add("0.0.0.0:11222");
+        AuditImp.getInstance().setAuditProxy(ipList);
+    }
+
+    @Test
+    public void add() {
+        HashSet<String> ipList = new HashSet<>();
+        ipList.add("0.0.0.0:11222");
+        AuditImp.getInstance().setAuditProxy(ipList);
+        AuditImp.getInstance().add(1, "inlongGroupIDTest",
+                "inlongStreamIDTest", System.currentTimeMillis(), 1, 1);
+    }
+
+}
+
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderChannelTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderChannelTest.java
new file mode 100644
index 0000000..c883e6d
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderChannelTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.util.Encoder;
+import org.apache.inlong.audit.util.IpPort;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SenderChannelTest {
+    private static final Logger logger = LoggerFactory.getLogger(SenderChannelTest.class);
+    private ClientBootstrap client = new ClientBootstrap();
+    private IpPort ipPortObj = new IpPort("0.0.0.0", 54041);
+    private ChannelFuture future;
+    SenderChannel senderChannel;
+
+    /**
+     * test  SenderChannel
+     */
+    public SenderChannelTest() {
+        try {
+            client.setFactory(new NioClientSocketChannelFactory(
+                    Executors.newCachedThreadPool(),
+                    Executors.newCachedThreadPool(),
+                    10));
+
+            client.setPipelineFactory(() -> {
+                ChannelPipeline pipeline = Channels.pipeline();
+                pipeline.addLast("encoder", new Encoder());
+                return pipeline;
+            });
+            client.setOption("tcpNoDelay", true);
+            client.setOption("child.tcpNoDelay", true);
+            client.setOption("keepAlive", true);
+            client.setOption("child.keepAlive", true);
+            client.setOption("reuseAddr", true);
+
+            future = client.connect(ipPortObj.addr).await();
+            senderChannel = new SenderChannel(future.getChannel(), ipPortObj, 10);
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void tryAcquire() {
+        boolean ret = senderChannel.tryAcquire();
+        assertTrue(ret);
+    }
+
+    @Test
+    public void release() {
+        senderChannel.release();
+    }
+
+    @Test
+    public void testToString() {
+        IpPort ipPort = senderChannel.getIpPort();
+        assertEquals(ipPort, ipPortObj);
+    }
+
+    @Test
+    public void getIpPort() {
+        String toString = senderChannel.toString();
+        assertEquals(toString, "0.0.0.0:54041");
+    }
+
+    @Test
+    public void getChannel() {
+        Channel channel = senderChannel.getChannel();
+        assertFalse(channel.getRemoteAddress().toString().equals("0.0.0.0:54041"));
+    }
+}
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
new file mode 100644
index 0000000..d69a26c
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+import org.apache.inlong.audit.util.AuditData;
+import org.apache.inlong.audit.util.Config;
+import org.apache.inlong.audit.util.Decoder;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.junit.Test;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SenderGroupTest {
+    Config testConfig = new Config();
+    SenderManager testManager = new SenderManager(testConfig);
+    SenderHandler clientHandler = new org.apache.inlong.audit.send.SenderHandler(testManager);
+    SenderGroup sender = new org.apache.inlong.audit.send.SenderGroup(10, new Decoder(), clientHandler);
+
+    @Test
+    public void send() {
+        AuditApi.AuditMessageHeader header = AuditApi.AuditMessageHeader.newBuilder().setIp("127.0.0.1").build();
+        AuditApi.AuditMessageBody body = AuditApi.AuditMessageBody.newBuilder().setAuditId("1").build();
+        AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(header)
+                .addMsgBody(body).build();
+        AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
+        AuditData testData = new AuditData(System.currentTimeMillis(), baseCommand);
+        ChannelBuffer dataBuf = ChannelBuffers.wrappedBuffer(testData.getDataByte());
+        sender.send(dataBuf);
+    }
+
+    @Test
+    public void release() {
+        sender.release("127.0.9.1:80");
+    }
+
+    @Test
+    public void updateConfig() {
+        Set<String> ipLists = new LinkedHashSet<>();
+        ipLists.add("127.0.9.1:80");
+        ipLists.add("127.0.9.1:81");
+        ipLists.add("127.0.9.1:82");
+        sender.updateConfig(ipLists);
+    }
+
+    @Test
+    public void isHasSendError() {
+        sender.setHasSendError(false);
+        boolean isError = sender.isHasSendError();
+        assertFalse(isError);
+        sender.setHasSendError(true);
+        isError = sender.isHasSendError();
+        assertTrue(isError);
+    }
+
+    @Test
+    public void setHasSendError() {
+        sender.setHasSendError(false);
+        boolean isError = sender.isHasSendError();
+        assertFalse(isError);
+        sender.setHasSendError(true);
+        isError = sender.isHasSendError();
+        assertTrue(isError);
+    }
+}
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
new file mode 100644
index 0000000..696bcc5
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+import org.apache.inlong.audit.util.Config;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+;
+
+public class SenderManagerTest {
+    private Config testConfig = new Config();
+
+    @Test
+    public void nextRequestId() {
+        SenderManager testManager = new SenderManager(testConfig);
+        Long requestId = testManager.nextRequestId();
+        System.out.println(requestId);
+        assertTrue(requestId == 0);
+
+        requestId = testManager.nextRequestId();
+        assertTrue(requestId == 1);
+
+        requestId = testManager.nextRequestId();
+        assertTrue(requestId == 2);
+    }
+
+    @Test
+    public void send() {
+        AuditApi.AuditMessageHeader header = AuditApi.AuditMessageHeader.newBuilder().setIp("127.0.0.1").build();
+        AuditApi.AuditMessageBody body = AuditApi.AuditMessageBody.newBuilder().setAuditId("1").build();
+        AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(header)
+                .addMsgBody(body).build();
+        AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
+        SenderManager testManager = new SenderManager(testConfig);
+        testManager.send(System.currentTimeMillis(), baseCommand);
+    }
+
+    @Test
+    public void clearBuffer() {
+        SenderManager testManager = new SenderManager(testConfig);
+        testManager.clearBuffer();
+        int dataMapSize = testManager.getDataMapSize();
+        assertTrue(dataMapSize == 0);
+    }
+}
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
new file mode 100644
index 0000000..1d4d688
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.apache.inlong.audit.protocol.AuditApi;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class AuditDataTest {
+    @Test
+    public void increaseResendTimes() {
+        AuditApi.BaseCommand content = null;
+        AuditData test = new AuditData(System.currentTimeMillis(), content);
+        int resendTimes = test.increaseResendTimes();
+        assertTrue(resendTimes == 1);
+        resendTimes = test.increaseResendTimes();
+        assertTrue(resendTimes == 2);
+        resendTimes = test.increaseResendTimes();
+        assertTrue(resendTimes == 3);
+    }
+
+    @Test
+    public void getDataByte() {
+        AuditApi.AuditMessageHeader header = AuditApi.AuditMessageHeader.newBuilder().setIp("127.0.0.1").build();
+        AuditApi.AuditMessageBody body = AuditApi.AuditMessageBody.newBuilder().setAuditId("1").build();
+        AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(header)
+                .addMsgBody(body).build();
+        AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
+        AuditData test = new AuditData(System.currentTimeMillis(), baseCommand);
+        byte[] data = test.getDataByte();
+        assertTrue(data.length > 0);
+    }
+
+    @Test
+    public void addBytes() {
+    }
+}
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/ConfigTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/ConfigTest.java
new file mode 100644
index 0000000..f2827d3
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/ConfigTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class ConfigTest {
+
+    @Test
+    public void getLocalIP() {
+        Config test = new Config();
+        test.init();
+        String ip = test.getLocalIP();
+        assertTrue(ip.length() >= 0);
+    }
+
+    @Test
+    public void getDockerId() {
+        Config test = new Config();
+        test.init();
+        String dockerId = test.getDockerId();
+        assertTrue(dockerId.length() >= 0);
+    }
+}
\ No newline at end of file
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java
new file mode 100644
index 0000000..cdb8fd4
--- /dev/null
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/IpPortTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IpPortTest {
+    private IpPort test = new IpPort("127.0.0.1", 80);
+
+    @Test
+    public void getIpPortKey() {
+        String ipPortKey = test.getIpPortKey("127.0.0.1", 80);
+        assertTrue(ipPortKey.equals("127.0.0.1:80"));
+    }
+
+    @Test
+    public void testHashCode() {
+        int hashCode = test.hashCode();
+        assertTrue(hashCode != 0);
+    }
+
+    @Test
+    public void testEquals() {
+        IpPort test1 = new IpPort("127.0.0.1", 81);
+        boolean ret = test.equals(test1);
+        assertFalse(ret);
+
+        IpPort test2 = new IpPort("127.0.0.1", 80);
+        ret = test.toString().equals(test2.toString());
+        assertTrue(ret);
+
+        IpPort test3 = test;
+        ret = test.equals(test3);
+        assertTrue(ret);
+    }
+
+    @Test
+    public void parseIpPort() {
+        IpPort testIpPort = test.parseIpPort("127.0.0.1:83");
+        System.out.println(testIpPort);
+    }
+
+    @Test
+    public void testToString() {
+        String toSteing = test.toString();
+        System.out.println(toSteing);
+    }
+}
\ No newline at end of file
diff --git a/inlong-audit/pom.xml b/inlong-audit/pom.xml
index a1904b5..b5b9e9b 100644
--- a/inlong-audit/pom.xml
+++ b/inlong-audit/pom.xml
@@ -38,6 +38,7 @@
         <module>audit-docker</module>
         <module>audit-store</module>
         <module>audit-common</module>
+		<module>audit-sdk</module>
     </modules>
 
     <properties>