You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/27 12:19:12 UTC

[inlong] branch master updated: [INLONG-6031][Audit] Clean code for InLong Audit (#6032)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f310a1e5 [INLONG-6031][Audit] Clean code for InLong Audit (#6032)
1f310a1e5 is described below

commit 1f310a1e587b0e69d44019f9f66eef42f14b7af3
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Tue Sep 27 20:19:07 2022 +0800

    [INLONG-6031][Audit] Clean code for InLong Audit (#6032)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../inlong/agent/metrics/audit/AuditUtils.java     |  31 +-
 .../org/apache/inlong/agent/core/AgentMain.java    |  22 +-
 .../org/apache/inlong/audit/protocol/Commands.java |  19 +-
 .../audit-common/src/main/proto/AuditApi.proto     |  30 +-
 .../inlong/audit/source/ServerMessageHandler.java  | 180 ++++----
 .../audit/{AuditImp.java => AuditOperator.java}    | 174 ++++----
 .../apache/inlong/audit/send/SenderHandler.java    |  19 +-
 .../apache/inlong/audit/send/SenderManager.java    | 115 +++---
 .../org/apache/inlong/audit/util/AuditData.java    |  20 +-
 .../apache/inlong/audit/util/AuditDataTest.java    |  16 +-
 .../inlong/dataproxy/metrics/audit/AuditUtils.java |  62 ++-
 .../apache/inlong/dataproxy/node/Application.java  | 458 ++++++++++-----------
 ...ovider.java => ManagerPropsConfigProvider.java} |  38 +-
 .../sort/standalone/SortStandaloneApplication.java |  19 +-
 .../sort/standalone/metrics/audit/AuditUtils.java  |  50 +--
 .../inlong/sort/base/metric/SinkMetricData.java    |  26 +-
 .../inlong/sort/base/metric/SourceMetricData.java  |  18 +-
 .../server/broker/stats/audit/AuditUtils.java      |  42 +-
 18 files changed, 605 insertions(+), 734 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index d3ad42538..ff6518ae5 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -19,9 +19,10 @@ package org.apache.inlong.agent.metrics.audit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.audit.util.AuditConfig;
 
+import java.util.Collections;
 import java.util.HashSet;
 
 import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE;
@@ -44,49 +45,47 @@ public class AuditUtils {
     private static boolean IS_AUDIT = true;
 
     /**
-     * initAudit
+     * Init audit config
      */
     public static void initAudit() {
         AgentConfiguration conf = AgentConfiguration.getAgentConf();
-        // IS_AUDIT
         IS_AUDIT = conf.getBoolean(AUDIT_ENABLE, DEFAULT_AUDIT_ENABLE);
         if (IS_AUDIT) {
             // AuditProxy
             String strIpPorts = conf.get(AUDIT_KEY_PROXYS, DEFAULT_AUDIT_PROXYS);
-            HashSet<String> proxys = new HashSet<>();
+            HashSet<String> proxySet = new HashSet<>();
             if (!StringUtils.isBlank(strIpPorts)) {
                 String[] ipPorts = strIpPorts.split("\\s+");
-                for (String ipPort : ipPorts) {
-                    proxys.add(ipPort);
-                }
+                Collections.addAll(proxySet, ipPorts);
             }
-            AuditImp.getInstance().setAuditProxy(proxys);
+            AuditOperator.getInstance().setAuditProxy(proxySet);
+
             // AuditConfig
             String filePath = conf.get(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH);
             int maxCacheRow = conf.getInt(AUDIT_KEY_MAX_CACHE_ROWS, AUDIT_DEFAULT_MAX_CACHE_ROWS);
             AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
-            AuditImp.getInstance().setAuditConfig(auditConfig);
+            AuditOperator.getInstance().setAuditConfig(auditConfig);
         }
     }
 
     /**
-     * add audit metric
+     * Add audit metric
      */
-    public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count,
-            long size) {
+    public static void add(int auditID, String inlongGroupId, String inlongStreamId,
+            long logTime, int count, long size) {
         if (!IS_AUDIT) {
             return;
         }
-        AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size);
+        AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size);
     }
 
     /**
-     * sendReport
+     * Send audit data
      */
-    public static void sendReport() {
+    public static void send() {
         if (!IS_AUDIT) {
             return;
         }
-        AuditImp.getInstance().sendReport();
+        AuditOperator.getInstance().send();
     }
 }
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
index ff48cf47f..6816f57f3 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
@@ -85,44 +85,42 @@ public class AgentMain {
     }
 
     /**
-     * Stopping agent gracefully if get killed.
+     * Stopping agent manager gracefully if it was killed.
      *
-     * @param manager agent manager
+     * @param agentManager agent manager
      */
-    private static void stopManagerIfKilled(AgentManager manager) {
+    private static void stopAgentIfKilled(AgentManager agentManager) {
         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
             try {
                 LOGGER.info("stopping agent gracefully");
-                manager.stop();
+                agentManager.stop();
             } catch (Exception ex) {
-                LOGGER.error("exception while stopping threads", ex);
+                LOGGER.error("stop agent manager error: ", ex);
             }
         }));
     }
 
     /**
      * Main entrance.
-     *
-     * @param args arguments
-     * @throws Exception exceptions
      */
     public static void main(String[] args) throws Exception {
         CommandLine cl = initOptions(args);
         assert cl != null;
         initAgentConf(cl);
         AuditUtils.initAudit();
+
         AgentManager manager = new AgentManager();
         try {
             manager.start();
-            stopManagerIfKilled(manager);
-            //metrics
+            stopAgentIfKilled(manager);
+            // metrics
             MetricObserver.init(AgentConfiguration.getAgentConf().getConfigProperties());
             manager.join();
         } catch (Exception ex) {
-            LOGGER.error("exception caught", ex);
+            LOGGER.error("agent running exception: ", ex);
         } finally {
             manager.stop();
-            AuditUtils.sendReport();
+            AuditUtils.send();
         }
     }
 }
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java
index fc833bf73..7ae674b77 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java
@@ -25,6 +25,9 @@ import org.apache.inlong.audit.protocol.AuditApi.BaseCommand;
 import org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type;
 import org.apache.inlong.audit.protocol.AuditApi.Pong;
 
+/**
+ * Audit commands, used to get various of ByteBuf.
+ */
 public class Commands {
 
     public static int HEAD_LENGTH = 4;
@@ -45,24 +48,24 @@ public class Commands {
 
     public static ByteBuf getAuditRequestBuffer(AuditRequest auditRequest) {
         BaseCommand cmdAuditRequest = BaseCommand.newBuilder()
-                .setType(Type.AUDITREQUEST)
+                .setType(Type.AUDIT_REQUEST)
                 .setAuditRequest(auditRequest).build();
         return getChannelBuffer(cmdAuditRequest.toByteArray());
     }
 
-    public static ByteBuf getAuditReplylBuffer(AuditReply auditReply) {
+    public static ByteBuf getAuditReplyBuffer(AuditReply auditReply) {
         BaseCommand cmdAuditReply = BaseCommand.newBuilder()
-                .setType(Type.AUDITREPLY)
+                .setType(Type.AUDIT_REPLY)
                 .setAuditReply(auditReply).build();
         return getChannelBuffer(cmdAuditReply.toByteArray());
     }
 
     private static ByteBuf getChannelBuffer(byte[] body) {
-        /* [totalSize] | [body]*/
+        // [totalSize] | [body]
         int totalLength = body.length;
-        ByteBuf cmdPingBuffer = ByteBufAllocator.DEFAULT.buffer(HEAD_LENGTH + totalLength);
-        cmdPingBuffer.writeInt(totalLength);
-        cmdPingBuffer.writeBytes(body);
-        return cmdPingBuffer;
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(HEAD_LENGTH + totalLength);
+        buffer.writeInt(totalLength);
+        buffer.writeBytes(body);
+        return buffer;
     }
 }
diff --git a/inlong-audit/audit-common/src/main/proto/AuditApi.proto b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
index ce98fcdcf..ce89b112b 100644
--- a/inlong-audit/audit-common/src/main/proto/AuditApi.proto
+++ b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
@@ -20,17 +20,17 @@ syntax = "proto3";
 package org.apache.inlong.audit.protocol;
 
 message BaseCommand {
-    enum Type {
-        PING          = 0;
-        PONG          = 1;
-        AUDITREQUEST  = 2;
-        AUDITREPLY    = 3;
-    }
-    Type type                            = 1;
-    AuditRequest audit_request  = 2;
-    AuditReply audit_reply      = 3;
-    Ping ping                   = 4;
-    Pong pong                   = 5;
+  enum Type {
+    PING = 0;
+    PONG = 1;
+    AUDIT_REQUEST = 2;
+    AUDIT_REPLY = 3;
+  }
+  Type type = 1;
+  AuditRequest audit_request = 2;
+  AuditReply audit_reply = 3;
+  Ping ping = 4;
+  Pong pong = 5;
 }
 
 message Ping {
@@ -55,8 +55,8 @@ message AuditMessageHeader {
 
 message AuditMessageBody {
   uint64 log_ts = 1;
-  string inlong_group_id= 2;
-  string inlong_stream_id= 3;
+  string inlong_group_id = 2;
+  string inlong_stream_id = 3;
   string audit_id = 4;
   uint64 count = 5;
   uint64 size = 6;
@@ -65,8 +65,8 @@ message AuditMessageBody {
 
 message AuditReply {
   enum RSP_CODE {
-    SUCCESS  = 0;
-    FAILED   = 1;
+    SUCCESS = 0;
+    FAILED = 1;
     DISASTER = 2;
   }
   uint64 request_id = 1;
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
index 483698e79..4f4c86345 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
@@ -17,22 +17,16 @@
 
 package org.apache.inlong.audit.source;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.gson.Gson;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.group.ChannelGroup;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.source.AbstractSource;
-
 import org.apache.inlong.audit.protocol.AuditApi.AuditMessageBody;
 import org.apache.inlong.audit.protocol.AuditApi.AuditReply;
 import org.apache.inlong.audit.protocol.AuditApi.AuditReply.RSP_CODE;
@@ -43,43 +37,40 @@ import org.apache.inlong.audit.protocol.Commands;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
 /**
  * Server message handler
- *
  */
 public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
 
-    private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
-
-    private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerMessageHandler.class);
+    private static final Gson GSON = new Gson();
 
-    private AbstractSource source;
     private final ChannelGroup allChannels;
-    private int maxConnections = Integer.MAX_VALUE;
-
     private final ChannelProcessor processor;
     private final ServiceDecoder serviceDecoder;
-
-    private final Gson gson = new Gson();
+    private final int maxConnections;
 
     public ServerMessageHandler(AbstractSource source, ServiceDecoder serviceDecoder,
-                                ChannelGroup allChannels, Integer maxCons) {
-        this.source = source;
+            ChannelGroup allChannels, Integer maxCons) {
         this.processor = source.getChannelProcessor();
         this.serviceDecoder = serviceDecoder;
         this.allChannels = allChannels;
         this.maxConnections = maxCons;
-
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    public void channelActive(ChannelHandlerContext ctx) {
         if (allChannels.size() - 1 >= maxConnections) {
-            logger.warn("refuse to connect , and connections=" + (allChannels.size() - 1)
-                    + ", maxConnections="
-                    + maxConnections + ",channel is " + ctx.channel());
             ctx.channel().disconnect();
             ctx.channel().close();
+            LOGGER.warn("refuse to connect to channel: {}, connections={}, maxConnections={}",
+                    ctx.channel(), allChannels.size() - 1, maxConnections);
         }
         allChannels.add(ctx.channel());
         ctx.fireChannelActive();
@@ -93,126 +84,115 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        logger.debug("message received");
         if (msg == null) {
-            logger.warn("get null message event, just skip");
+            LOGGER.warn("get null message event, just skip");
             return;
         }
-        ByteBuf cb = (ByteBuf) msg;
-        int len = cb.readableBytes();
+        ByteBuf buf = (ByteBuf) msg;
+        int len = buf.readableBytes();
         if (len == 0) {
-            logger.warn("receive message skip empty msg.");
-            cb.clear();
+            LOGGER.warn("receive message skip empty msg");
+            buf.clear();
             return;
         }
         Channel remoteChannel = ctx.channel();
-        BaseCommand cmd = null;
+        BaseCommand cmd;
         try {
-            cmd = serviceDecoder.extractData(cb, remoteChannel);
+            cmd = serviceDecoder.extractData(buf, remoteChannel);
         } catch (Exception ex) {
-            logger.error("extractData has error e {}", ex);
-            throw new IOException(ex.getCause());
+            LOGGER.error("extract data error: ", ex);
+            throw new IOException(ex);
         }
-
         if (cmd == null) {
-            logger.warn("receive message extractData is null");
+            LOGGER.warn("extract data from received msg is null");
             return;
         }
+
         ByteBuf channelBuffer = null;
         switch (cmd.getType()) {
             case PING:
                 checkArgument(cmd.hasPing());
-                channelBuffer  = Commands.getPongChannelBuffer();
+                channelBuffer = Commands.getPongChannelBuffer();
                 break;
             case PONG:
                 checkArgument(cmd.hasPong());
-                channelBuffer  = Commands.getPingChannelBuffer();
+                channelBuffer = Commands.getPingChannelBuffer();
                 break;
-            case AUDITREQUEST:
+            case AUDIT_REQUEST:
                 checkArgument(cmd.hasAuditRequest());
                 AuditReply auditReply = handleRequest(cmd.getAuditRequest());
-                channelBuffer  = Commands.getAuditReplylBuffer(auditReply);
+                channelBuffer = Commands.getAuditReplyBuffer(auditReply);
                 break;
-            case AUDITREPLY:
+            case AUDIT_REPLY:
                 checkArgument(cmd.hasAuditReply());
                 break;
             default:
-                channelBuffer = null;
         }
         if (channelBuffer != null) {
             writeResponse(remoteChannel, channelBuffer);
         }
     }
 
-    private AuditReply handleRequest(AuditRequest auditRequest) {
-        AuditReply reply = null;
-        if (auditRequest != null) {
-            List<AuditMessageBody> bodyList = auditRequest.getMsgBodyList();
-            if (bodyList != null) {
-                int errorMsgBody = 0;
-                for (AuditMessageBody auditMessageBody : bodyList) {
-                    AuditData auditData = new AuditData();
-                    auditData.setIp(auditRequest.getMsgHeader().getIp());
-                    auditData.setThreadId(auditRequest.getMsgHeader().getThreadId());
-                    auditData.setDockerId(auditRequest.getMsgHeader().getDockerId());
-                    auditData.setPacketId(auditRequest.getMsgHeader().getPacketId());
-                    auditData.setSdkTs(auditRequest.getMsgHeader().getSdkTs());
-
-                    auditData.setLogTs(auditMessageBody.getLogTs());
-                    auditData.setAuditId(auditMessageBody.getAuditId());
-                    auditData.setCount(auditMessageBody.getCount());
-                    auditData.setDelay(auditMessageBody.getDelay());
-                    auditData.setInlongGroupId(auditMessageBody.getInlongGroupId());
-                    auditData.setInlongStreamId(auditMessageBody.getInlongStreamId());
-                    auditData.setSize(auditMessageBody.getSize());
-
-                    byte[] body = null;
-                    try {
-                        body = gson.toJson(auditData).getBytes("UTF-8");
-                    } catch (UnsupportedEncodingException e) {
-                        logger.error("UnsupportedEncodingException = {}", e);
-                    }
-                    if (body != null) {
-                        Event event = null;
-                        try {
-                            event = EventBuilder.withBody(body, null);
-                            processor.processEvent(event);
-                        } catch (Throwable ex) {
-                            logger.error("Error writing to controller,data will discard.", ex);
-                            errorMsgBody++;
-                        }
-                    }
-                }
-                if (errorMsgBody != 0) {
-                    reply = AuditReply.newBuilder().setRequestId(auditRequest.getRequestId())
-                            .setMessage("Error writing to controller,data "
-                            + "will discard. error body num = "
-                            + errorMsgBody).setRspCode(RSP_CODE.FAILED).build();
-                }
+    private AuditReply handleRequest(AuditRequest auditRequest) throws Exception {
+        if (auditRequest == null) {
+            throw new Exception("audit request cannot be null");
+        }
+        AuditReply reply = AuditReply.newBuilder()
+                .setRequestId(auditRequest.getRequestId())
+                .setRspCode(RSP_CODE.SUCCESS)
+                .build();
+        List<AuditMessageBody> bodyList = auditRequest.getMsgBodyList();
+        int errorMsgBody = 0;
+        for (AuditMessageBody auditMessageBody : bodyList) {
+            AuditData auditData = new AuditData();
+            auditData.setIp(auditRequest.getMsgHeader().getIp());
+            auditData.setThreadId(auditRequest.getMsgHeader().getThreadId());
+            auditData.setDockerId(auditRequest.getMsgHeader().getDockerId());
+            auditData.setPacketId(auditRequest.getMsgHeader().getPacketId());
+            auditData.setSdkTs(auditRequest.getMsgHeader().getSdkTs());
+
+            auditData.setLogTs(auditMessageBody.getLogTs());
+            auditData.setAuditId(auditMessageBody.getAuditId());
+            auditData.setCount(auditMessageBody.getCount());
+            auditData.setDelay(auditMessageBody.getDelay());
+            auditData.setInlongGroupId(auditMessageBody.getInlongGroupId());
+            auditData.setInlongStreamId(auditMessageBody.getInlongStreamId());
+            auditData.setSize(auditMessageBody.getSize());
+
+            try {
+                byte[] body = GSON.toJson(auditData).getBytes(StandardCharsets.UTF_8);
+                Event event = EventBuilder.withBody(body, null);
+                processor.processEvent(event);
+            } catch (Throwable ex) {
+                LOGGER.error("writing data error, discard it: ", ex);
+                errorMsgBody++;
             }
         }
-        if (reply == null) {
-            reply = AuditReply.newBuilder().setRequestId(auditRequest.getRequestId())
-                    .setRspCode(RSP_CODE.SUCCESS).build();
+
+        if (errorMsgBody != 0) {
+            reply = reply.toBuilder()
+                    .setMessage("writing data error, discard it, error body count=" + errorMsgBody)
+                    .setRspCode(RSP_CODE.FAILED)
+                    .build();
         }
+
         return reply;
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        logger.error("exception caught", cause);
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        LOGGER.error("exception caught", cause);
     }
 
-    private void writeResponse(Channel remoteChannel, ByteBuf buffer) throws Exception {
-        if (remoteChannel.isWritable()) {
-            remoteChannel.writeAndFlush(buffer);
-        } else {
-            logger.warn(
-                    "the send buffer2 is full, so disconnect it!please check remote client"
-                            + "; Connection info:" + remoteChannel);
-            throw new Exception(new Throwable(
-                    "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
-                            + remoteChannel));
+    private void writeResponse(Channel channel, ByteBuf buffer) throws Exception {
+        if (channel.isWritable()) {
+            channel.writeAndFlush(buffer);
+            return;
         }
+
+        String msg = String.format("remote channel=%s is not writable, please check remote client!", channel);
+        LOGGER.warn(msg);
+        throw new Exception(msg);
     }
+
 }
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
similarity index 60%
rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java
rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
index 2ef56cac9..ddbafd916 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
@@ -36,47 +36,61 @@ import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDITREQUEST;
+import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;
 
-public class AuditImp {
-    private static final Logger logger = LoggerFactory.getLogger(AuditImp.class);
-    private static AuditImp auditImp = new AuditImp();
+/**
+ * Audit operator, which is singleton.
+ */
+public class AuditOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class);
     private static final String FIELD_SEPARATORS = ":";
-    private ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<String, StatInfo>();
-    private HashMap<String, StatInfo> threadSumMap = new HashMap<String, StatInfo>();
-    private ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<String, StatInfo>();
-    private List<String> deleteKeyList = new ArrayList<String>();
-    private AuditConfig auditConfig = null;
-    private Config config = new Config();
-    private Long sdkTime;
+    private static final int BATCH_NUM = 100;
+    private static final AuditOperator AUDIT_OPERATOR = new AuditOperator();
+    private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
+    private static final int PERIOD = 1000 * 60;
+    private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>();
+    private final HashMap<String, StatInfo> threadCountMap = new HashMap<>();
+    private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>();
+    private final List<String> deleteKeyList = new ArrayList<>();
+    private final Config config = new Config();
+    private final Timer timer = new Timer();
     private int packageId = 1;
     private int dataId = 0;
-    private static final int BATCH_NUM = 100;
-    boolean inited = false;
+    private boolean initialized = false;
     private SenderManager manager;
-    private static ReentrantLock globalLock = new ReentrantLock();
-    private static int PERIOD = 1000 * 60;
-    private Timer timer = new Timer();
-    private TimerTask timerTask = new TimerTask() {
+
+    private final TimerTask timerTask = new TimerTask() {
         @Override
         public void run() {
             try {
-                sendReport();
+                send();
             } catch (Exception e) {
-                logger.error(e.getMessage());
+                LOGGER.error(e.getMessage());
             }
         }
     };
+    private AuditConfig auditConfig = null;
+
+    /**
+     * Not support create from outer.
+     */
+    private AuditOperator() {
 
-    public static AuditImp getInstance() {
-        return auditImp;
+    }
+
+    /**
+     * Get AuditOperator instance.
+     */
+    public static AuditOperator getInstance() {
+        return AUDIT_OPERATOR;
     }
 
     /**
      * init
      */
     private void init() {
-        if (inited) {
+        if (initialized) {
             return;
         }
         config.init();
@@ -88,29 +102,25 @@ public class AuditImp {
     }
 
     /**
-     * setAuditProxy
-     *
-     * @param ipPortList
+     * Set AuditProxy from the ip
      */
     public void setAuditProxy(HashSet<String> ipPortList) {
         try {
-            globalLock.lockInterruptibly();
-            if (!inited) {
+            GLOBAL_LOCK.lockInterruptibly();
+            if (!initialized) {
                 init();
-                inited = true;
+                initialized = true;
             }
             this.manager.setAuditProxy(ipPortList);
         } catch (InterruptedException e) {
-            logger.error(e.getMessage());
+            LOGGER.error(e.getMessage());
         } finally {
-            globalLock.unlock();
+            GLOBAL_LOCK.unlock();
         }
     }
 
     /**
      * set audit config
-     *
-     * @param config
      */
     public void setAuditConfig(AuditConfig config) {
         auditConfig = config;
@@ -118,14 +128,7 @@ public class AuditImp {
     }
 
     /**
-     * api
-     *
-     * @param auditID
-     * @param inlongGroupID
-     * @param inlongStreamID
-     * @param logTime
-     * @param count
-     * @param size
+     * Add audit data
      */
     public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size) {
         long delayTime = System.currentTimeMillis() - logTime;
@@ -135,30 +138,21 @@ public class AuditImp {
     }
 
     /**
-     * add by key
-     *
-     * @param key
-     * @param count
-     * @param size
-     * @param delayTime
+     * Add audit info by key.
      */
     private void addByKey(String key, long count, long size, long delayTime) {
-        try {
-            if (countMap.get(key) == null) {
-                countMap.put(key, new StatInfo(0L, 0L, 0L));
-            }
-            countMap.get(key).count.addAndGet(count);
-            countMap.get(key).size.addAndGet(size);
-            countMap.get(key).delay.addAndGet(delayTime * count);
-        } catch (Exception e) {
-            return;
+        if (countMap.get(key) == null) {
+            countMap.put(key, new StatInfo(0L, 0L, 0L));
         }
+        countMap.get(key).count.addAndGet(count);
+        countMap.get(key).size.addAndGet(size);
+        countMap.get(key).delay.addAndGet(delayTime * count);
     }
 
     /**
-     * Report audit data
+     * Send audit data
      */
-    public synchronized void sendReport() {
+    public synchronized void send() {
         manager.clearBuffer();
         resetStat();
         // Retrieve statistics from the list of objects without statistics to be eliminated
@@ -183,76 +177,78 @@ public class AuditImp {
             this.deleteCountMap.put(key, value);
         }
         this.deleteKeyList.clear();
-        sdkTime = Calendar.getInstance().getTimeInMillis();
-        AuditApi.AuditMessageHeader mssageHeader = AuditApi.AuditMessageHeader.newBuilder()
+
+        long sdkTime = Calendar.getInstance().getTimeInMillis();
+        AuditApi.AuditMessageHeader msgHeader = AuditApi.AuditMessageHeader.newBuilder()
                 .setIp(config.getLocalIP()).setDockerId(config.getDockerId())
                 .setThreadId(String.valueOf(Thread.currentThread().getId()))
                 .setSdkTs(sdkTime).setPacketId(packageId)
                 .build();
-        AuditApi.AuditRequest.Builder requestBulid = AuditApi.AuditRequest.newBuilder();
-        requestBulid.setMsgHeader(mssageHeader).setRequestId(manager.nextRequestId());
-        for (Map.Entry<String, StatInfo> entry : threadSumMap.entrySet()) {
+        AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder();
+        requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());
+
+        // process the stat info for all threads
+        for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) {
             String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
             long logTime = Long.parseLong(keyArray[0]) * PERIOD;
             String inlongGroupID = keyArray[1];
             String inlongStreamID = keyArray[2];
             String auditID = keyArray[3];
             StatInfo value = entry.getValue();
-            AuditApi.AuditMessageBody mssageBody = AuditApi.AuditMessageBody.newBuilder()
-                    .setLogTs(logTime).setInlongGroupId(inlongGroupID)
-                    .setInlongStreamId(inlongStreamID).setAuditId(auditID)
-                    .setCount(value.count.get()).setSize(value.size.get())
+            AuditApi.AuditMessageBody msgBody = AuditApi.AuditMessageBody.newBuilder()
+                    .setLogTs(logTime)
+                    .setInlongGroupId(inlongGroupID)
+                    .setInlongStreamId(inlongStreamID)
+                    .setAuditId(auditID)
+                    .setCount(value.count.get())
+                    .setSize(value.size.get())
                     .setDelay(value.delay.get())
                     .build();
-            requestBulid.addMsgBody(mssageBody);
+            requestBuild.addMsgBody(msgBody);
+
             if (dataId++ >= BATCH_NUM) {
                 dataId = 0;
                 packageId++;
-                sendByBaseCommand(sdkTime, requestBulid.build());
-                requestBulid.clearMsgBody();
+                sendByBaseCommand(requestBuild.build());
+                requestBuild.clearMsgBody();
             }
         }
-        if (requestBulid.getMsgBodyCount() > 0) {
-            sendByBaseCommand(sdkTime, requestBulid.build());
-            requestBulid.clearMsgBody();
+        if (requestBuild.getMsgBodyCount() > 0) {
+            sendByBaseCommand(requestBuild.build());
+            requestBuild.clearMsgBody();
         }
-        threadSumMap.clear();
-        logger.info("finish send report.");
+        threadCountMap.clear();
+
+        LOGGER.info("finish report audit data");
     }
 
     /**
-     * send base command
-     *
-     * @param sdkTime
-     * @param auditRequest
+     * Send base command
      */
-    private void sendByBaseCommand(long sdkTime, AuditApi.AuditRequest auditRequest) {
+    private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
         AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder();
-        baseCommand.setType(AUDITREQUEST).setAuditRequest(auditRequest).build();
-        manager.send(sdkTime, baseCommand.build());
+        baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
+        manager.send(baseCommand.build());
     }
 
     /**
      * Summary
-     *
-     * @param key
-     * @param statInfo
      */
     private void sumThreadGroup(String key, StatInfo statInfo) {
         long count = statInfo.count.getAndSet(0);
         if (0 == count) {
             return;
         }
-        if (threadSumMap.get(key) == null) {
-            threadSumMap.put(key, new StatInfo(0, 0, 0));
+        if (threadCountMap.get(key) == null) {
+            threadCountMap.put(key, new StatInfo(0, 0, 0));
         }
 
         long size = statInfo.size.getAndSet(0);
         long delay = statInfo.delay.getAndSet(0);
 
-        threadSumMap.get(key).count.addAndGet(count);
-        threadSumMap.get(key).size.addAndGet(size);
-        threadSumMap.get(key).delay.addAndGet(delay);
+        threadCountMap.get(key).count.addAndGet(count);
+        threadCountMap.get(key).size.addAndGet(size);
+        threadCountMap.get(key).delay.addAndGet(delay);
     }
 
     /**
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
index dd8ff1a3a..e2f8fc5b9 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
@@ -23,13 +23,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
-    private static final Logger logger = LoggerFactory.getLogger(SenderHandler.class);
-    private SenderManager manager;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SenderHandler.class);
+    private final SenderManager manager;
 
     /**
      * Constructor
-     *
-     * @param manager
      */
     public SenderHandler(SenderManager manager) {
         this.manager = manager;
@@ -39,11 +38,11 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
      * Message Received
      */
     @Override
-    public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, byte[] e)  {
+    public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, byte[] e) {
         try {
             manager.onMessageReceived(ctx, e);
         } catch (Throwable ex) {
-            logger.error(ex.getMessage());
+            LOGGER.error("channelRead0 error: ", ex);
         }
     }
 
@@ -55,7 +54,7 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
         try {
             manager.onExceptionCaught(ctx, e);
         } catch (Throwable ex) {
-            logger.error(ex.getMessage());
+            LOGGER.error("caught exception: ", ex);
         }
     }
 
@@ -63,11 +62,11 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
      * Disconnected channel
      */
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    public void channelInactive(ChannelHandlerContext ctx) {
         try {
             super.channelInactive(ctx);
         } catch (Throwable ex) {
-            logger.error(ex.getMessage());
+            LOGGER.error("channelInactive error: ", ex);
         }
     }
 
@@ -79,7 +78,7 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
         try {
             super.channelUnregistered(ctx);
         } catch (Throwable ex) {
-            logger.error(ex.getMessage());
+            LOGGER.error("channelUnregistered error: ", ex);
         }
     }
 }
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index b841014d5..9d317a238 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -42,28 +42,27 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * sender manager
+ * Audit sender manager
  */
 public class SenderManager {
-    private static final Logger logger = LoggerFactory.getLogger(SenderManager.class);
+
     public static final Long MAX_REQUEST_ID = 1000000000L;
-    private static final int SEND_INTERVAL_MS = 20;
     public static final int ALL_CONNECT_CHANNEL = -1;
     public static final int DEFAULT_CONNECT_CHANNEL = 2;
-    public static final String LF = "\n";
+    private static final Logger logger = LoggerFactory.getLogger(SenderManager.class);
+    private static final int SEND_INTERVAL_MS = 20;
+    private final SecureRandom sRandom = new SecureRandom(Long.toString(System.currentTimeMillis()).getBytes());
+    private final AtomicLong requestIdSeq = new AtomicLong(0L);
+    private final ConcurrentHashMap<Long, AuditData> dataMap = new ConcurrentHashMap<>();
+
     private SenderGroup sender;
     private int maxConnectChannels = ALL_CONNECT_CHANNEL;
-    private SecureRandom sRandom = new SecureRandom(Long.toString(System.currentTimeMillis()).getBytes());
     // IPList
-    private HashSet<String> currentIpPorts = new HashSet<String>();
-    private AtomicLong requestIdSeq = new AtomicLong(0L);
-    private ConcurrentHashMap<Long, AuditData> dataMap = new ConcurrentHashMap<>();
+    private HashSet<String> currentIpPorts = new HashSet<>();
     private AuditConfig auditConfig;
 
     /**
      * Constructor
-     *
-     * @param config
      */
     public SenderManager(AuditConfig config) {
         this(config, DEFAULT_CONNECT_CHANNEL);
@@ -71,9 +70,6 @@ public class SenderManager {
 
     /**
      * Constructor
-     *
-     * @param config
-     * @param maxConnectChannels
      */
     public SenderManager(AuditConfig config, int maxConnectChannels) {
         try {
@@ -95,15 +91,15 @@ public class SenderManager {
         this.sender.setHasSendError(false);
         this.currentIpPorts = ipPortList;
         int ipSize = ipPortList.size();
-        int needNewSize = 0;
+        int needNewSize;
         if (this.maxConnectChannels == ALL_CONNECT_CHANNEL || this.maxConnectChannels >= ipSize) {
             needNewSize = ipSize;
         } else {
             needNewSize = maxConnectChannels;
         }
+
         HashSet<String> updateConfigIpLists = new HashSet<>();
-        List<String> availableIpLists = new ArrayList<String>();
-        availableIpLists.addAll(ipPortList);
+        List<String> availableIpLists = new ArrayList<>(ipPortList);
         for (int i = 0; i < needNewSize; i++) {
             int availableIpSize = availableIpLists.size();
             int newIpPortIndex = this.sRandom.nextInt(availableIpSize);
@@ -116,12 +112,10 @@ public class SenderManager {
     }
 
     /**
-     * next requestid
-     *
-     * @return
+     * next request id
      */
     public Long nextRequestId() {
-        Long requestId = requestIdSeq.getAndIncrement();
+        long requestId = requestIdSeq.getAndIncrement();
         if (requestId > MAX_REQUEST_ID) {
             requestId = 0L;
             requestIdSeq.set(requestId);
@@ -130,22 +124,17 @@ public class SenderManager {
     }
 
     /**
-     * send data
-     *
-     * @param sdkTime
-     * @param baseCommand
+     * Send data with command
      */
-    public void send(long sdkTime, AuditApi.BaseCommand baseCommand) {
-        AuditData data = new AuditData(sdkTime, baseCommand);
-        // Cache first
+    public void send(AuditApi.BaseCommand baseCommand) {
+        AuditData data = new AuditData(baseCommand);
+        // cache first
         this.dataMap.putIfAbsent(baseCommand.getAuditRequest().getRequestId(), data);
         this.sendData(data.getDataByte());
     }
 
     /**
-     * send data
-     *
-     * @param data
+     * Send data byte array
      */
     private void sendData(byte[] data) {
         if (data == null || data.length <= 0) {
@@ -167,7 +156,7 @@ public class SenderManager {
         logger.info("audit failed cache size: {}", this.dataMap.size());
         for (AuditData data : this.dataMap.values()) {
             this.sendData(data.getDataByte());
-            sleep(SEND_INTERVAL_MS);
+            this.sleep();
         }
         if (this.dataMap.size() == 0) {
             checkAuditFile();
@@ -190,10 +179,10 @@ public class SenderManager {
             File file = new File(auditConfig.getDisasterFile());
             if (!file.exists()) {
                 if (!file.createNewFile()) {
-                    logger.error("create {} {}", auditConfig.getDisasterFile(), " failed");
+                    logger.error("create file {} failed", auditConfig.getDisasterFile());
                     return;
                 }
-                logger.info("create {}", auditConfig.getDisasterFile());
+                logger.info("create file {} success", auditConfig.getDisasterFile());
             }
             if (file.length() > auditConfig.getMaxFileSize()) {
                 file.delete();
@@ -204,15 +193,13 @@ public class SenderManager {
             objectOutputStream.writeObject(dataMap);
             objectOutputStream.close();
             fos.close();
-        } catch (IOException ioException) {
-            logger.error(ioException.getMessage());
+        } catch (IOException e) {
+            logger.error("write local file error: ", e);
         }
     }
 
     /**
      * check file path
-     *
-     * @return
      */
     private boolean checkFilePath() {
         File file = new File(auditConfig.getFilePath());
@@ -220,7 +207,7 @@ public class SenderManager {
             if (!file.mkdirs()) {
                 return false;
             }
-            logger.info("create {}", auditConfig.getFilePath());
+            logger.info("create file {} success", auditConfig.getFilePath());
         }
         return true;
     }
@@ -235,26 +222,26 @@ public class SenderManager {
                 return;
             }
             FileInputStream inputStream = new FileInputStream(auditConfig.getDisasterFile());
-            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
+            ObjectInputStream objectStream = new ObjectInputStream(inputStream);
             ConcurrentHashMap<Long, AuditData> fileData =
-                    (ConcurrentHashMap<Long, AuditData>) objectInputStream.readObject();
+                    (ConcurrentHashMap<Long, AuditData>) objectStream.readObject();
             for (Map.Entry<Long, AuditData> entry : fileData.entrySet()) {
                 if (this.dataMap.size() < (auditConfig.getMaxCacheRow() / 2)) {
                     this.dataMap.putIfAbsent(entry.getKey(), entry.getValue());
                 }
                 this.sendData(entry.getValue().getDataByte());
-                sleep(SEND_INTERVAL_MS);
+                this.sleep();
             }
-            objectInputStream.close();
+            objectStream.close();
             inputStream.close();
             file.delete();
-        } catch (IOException | ClassNotFoundException ioException) {
-            logger.error(ioException.getMessage());
+        } catch (IOException | ClassNotFoundException e) {
+            logger.error("check audit file error: ", e);
         }
     }
 
     /**
-     * get data map szie
+     * get data map size
      */
     public int getDataMapSize() {
         return this.dataMap.size();
@@ -262,68 +249,60 @@ public class SenderManager {
 
     /**
      * processing return package
-     *
-     * @param ctx ctx
-     * @param msg msg
      */
     public void onMessageReceived(ChannelHandlerContext ctx, byte[] msg) {
         try {
-            //Analyze abnormal events
-            byte[] readBytes = msg;
-            AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.parseFrom(readBytes);
+            // Analyze abnormal events
+            AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.parseFrom(msg);
             // Parse request id
             Long requestId = baseCommand.getAuditReply().getRequestId();
             AuditData data = this.dataMap.get(requestId);
             if (data == null) {
-                logger.error("can not find the requestid onMessageReceived:" + requestId);
+                logger.error("can not find the request id onMessageReceived: " + requestId);
                 return;
             }
-            logger.info("audit-proxy response code: {}", baseCommand.getAuditReply().getRspCode().toString());
+
+            logger.info("audit-proxy response code: {}", baseCommand.getAuditReply().getRspCode());
             if (AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(baseCommand.getAuditReply().getRspCode())) {
                 this.dataMap.remove(requestId);
                 return;
             }
+
             int resendTimes = data.increaseResendTimes();
-            if (resendTimes < org.apache.inlong.audit.send.SenderGroup.MAX_SEND_TIMES) {
+            if (resendTimes < SenderGroup.MAX_SEND_TIMES) {
                 this.sendData(data.getDataByte());
             }
         } catch (Throwable ex) {
-            logger.error(ex.getMessage());
+            logger.error("onMessageReceived exception: ", ex);
             this.sender.setHasSendError(true);
         }
     }
 
     /**
      * Handle the packet return exception
-     *
-     * @param ctx
-     * @param e
      */
     public void onExceptionCaught(ChannelHandlerContext ctx, Throwable e) {
-        logger.error(e.getCause().getMessage());
+        logger.error("channel context " + ctx + " occurred exception: ", e);
         try {
             this.sender.setHasSendError(true);
         } catch (Throwable ex) {
-            logger.error(ex.getMessage());
+            logger.error("setHasSendError error: ", ex);
         }
     }
 
     /**
-     * sleep
-     *
-     * @param millisecond
+     * sleep SEND_INTERVAL_MS
      */
-    private void sleep(int millisecond) {
+    private void sleep() {
         try {
-            Thread.sleep(millisecond);
-        } catch (Throwable e) {
-            logger.error(e.getMessage());
+            Thread.sleep(SEND_INTERVAL_MS);
+        } catch (Throwable ex) {
+            logger.error("sleep error: ", ex);
         }
     }
 
     /***
      * set audit config
-     * @param config
      */
     public void setAuditConfig(AuditConfig config) {
         auditConfig = config;
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
index 4dde7e118..a7064b996 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
@@ -24,33 +24,27 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class AuditData implements Serializable {
+
     public static int HEAD_LENGTH = 4;
-    private final long sdkTime;
     private final AuditApi.BaseCommand content;
-    private AtomicInteger resendTimes = new AtomicInteger(0);
+    private final AtomicInteger resendTimes = new AtomicInteger(0);
 
     /**
      * Constructor
-     *
-     * @param sdkTime
-     * @param content
      */
-    public AuditData(long sdkTime, AuditApi.BaseCommand content) {
-        this.sdkTime = sdkTime;
+    public AuditData(AuditApi.BaseCommand content) {
         this.content = content;
     }
 
     /**
-     * set resendTimes
+     * Increase and get resend times
      */
     public int increaseResendTimes() {
         return this.resendTimes.incrementAndGet();
     }
 
     /**
-     * getDataByte
-     *
-     * @return
+     * Get data byte array
      */
     public byte[] getDataByte() {
         return addBytes(ByteBuffer.allocate(HEAD_LENGTH).putInt(content.toByteArray().length).array(),
@@ -60,9 +54,7 @@ public class AuditData implements Serializable {
     /**
      * Concatenated byte array
      *
-     * @param data1
-     * @param data2
-     * @return data1 and  data2 combined package result
+     * @return data1 and data2 combined package result
      */
     public byte[] addBytes(byte[] data1, byte[] data2) {
         byte[] data3 = new byte[data1.length + data2.length];
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
index 82babece5..4de0413ff 100644
--- a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
@@ -20,19 +20,20 @@ package org.apache.inlong.audit.util;
 import org.apache.inlong.audit.protocol.AuditApi;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class AuditDataTest {
+
     @Test
     public void increaseResendTimes() {
-        AuditApi.BaseCommand content = null;
-        AuditData test = new AuditData(System.currentTimeMillis(), content);
+        AuditData test = new AuditData(null);
         int resendTimes = test.increaseResendTimes();
-        assertTrue(resendTimes == 1);
+        assertEquals(1, resendTimes);
         resendTimes = test.increaseResendTimes();
-        assertTrue(resendTimes == 2);
+        assertEquals(2, resendTimes);
         resendTimes = test.increaseResendTimes();
-        assertTrue(resendTimes == 3);
+        assertEquals(3, resendTimes);
     }
 
     @Test
@@ -54,12 +55,9 @@ public class AuditDataTest {
         AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(headerBuilder.build())
                 .addMsgBody(bodyBuilder.build()).build();
         AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
-        AuditData test = new AuditData(System.currentTimeMillis(), baseCommand);
+        AuditData test = new AuditData(baseCommand);
         byte[] data = test.getDataByte();
         assertTrue(data.length > 0);
     }
 
-    @Test
-    public void addBytes() {
-    }
 }
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 24c251626..331620af9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,13 +17,11 @@
 
 package org.apache.inlong.dataproxy.metrics.audit;
 
-import java.util.HashSet;
-import java.util.Map;
 import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Event;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.audit.util.AuditConfig;
 import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -32,9 +30,12 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.utils.Constants;
 import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+
 /**
- * 
- * AuditUtils
+ * Audit utils
  */
 public class AuditUtils {
 
@@ -51,7 +52,7 @@ public class AuditUtils {
     private static boolean IS_AUDIT = true;
 
     /**
-     * initAudit
+     * Init audit
      */
     public static void initAudit() {
         // IS_AUDIT
@@ -62,26 +63,21 @@ public class AuditUtils {
             HashSet<String> proxys = new HashSet<>();
             if (!StringUtils.isBlank(strIpPorts)) {
                 String[] ipPorts = strIpPorts.split("\\s+");
-                for (String ipPort : ipPorts) {
-                    proxys.add(ipPort);
-                }
+                Collections.addAll(proxys, ipPorts);
             }
-            AuditImp.getInstance().setAuditProxy(proxys);
+            AuditOperator.getInstance().setAuditProxy(proxys);
             // AuditConfig
             String filePath = CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH);
             int maxCacheRow = NumberUtils.toInt(
                     CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS),
                     AUDIT_DEFAULT_MAX_CACHE_ROWS);
             AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
-            AuditImp.getInstance().setAuditConfig(auditConfig);
+            AuditOperator.getInstance().setAuditConfig(auditConfig);
         }
     }
 
     /**
-     * add
-     *
-     * @param auditID
-     * @param event
+     * Add audit data
      */
     public static void add(int auditID, Event event) {
         if (!IS_AUDIT || event == null) {
@@ -97,23 +93,20 @@ public class AuditUtils {
             if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
                 msgCount = Long.parseLong(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
             }
-            AuditImp.getInstance().add(auditID, inlongGroupId,
+            AuditOperator.getInstance().add(auditID, inlongGroupId,
                     inlongStreamId, logTime, msgCount, event.getBody().length);
         } else {
             String groupId = headers.get(AttributeConstants.GROUP_ID);
             String streamId = headers.get(AttributeConstants.STREAM_ID);
             long dataTime = NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME));
             long msgCount = NumberUtils.toLong(headers.get(ConfigConstants.MSG_COUNTER_KEY));
-            AuditImp.getInstance().add(auditID, groupId,
+            AuditOperator.getInstance().add(auditID, groupId,
                     streamId, dataTime, msgCount, event.getBody().length);
         }
     }
 
     /**
-     * getLogTime
-     * 
-     * @param  headers
-     * @return
+     * Get LogTime from headers
      */
     public static long getLogTime(Map<String, String> headers) {
         String strLogTime = headers.get(Constants.HEADER_KEY_MSG_TIME);
@@ -131,10 +124,7 @@ public class AuditUtils {
     }
 
     /**
-     * getLogTime
-     * 
-     * @param  event
-     * @return
+     * Get LogTime from event
      */
     public static long getLogTime(Event event) {
         if (event != null) {
@@ -145,20 +135,16 @@ public class AuditUtils {
     }
 
     /**
-     * getAuditFormatTime
-     * 
-     * @param  msgTime
-     * @return
+     * Get AuditFormatTime
      */
     public static long getAuditFormatTime(long msgTime) {
-        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
-        return auditFormatTime;
+        return msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
     }
 
     /**
-     * sendReport
+     * Send audit data
      */
-    public static void sendReport() {
-        AuditImp.getInstance().sendReport();
+    public static void send() {
+        AuditOperator.getInstance().send();
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
index 836ba0ce2..881e2b0b2 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -66,35 +66,30 @@ import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Application
+ * DataProxy application
  */
 public class Application {
 
-    private static final Logger logger = LoggerFactory
-            .getLogger(Application.class);
-
-    public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
-    public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
+    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
 
+    private static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
+    private static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
     private final List<LifecycleAware> components;
     private final LifecycleSupervisor supervisor;
+    private final ReentrantLock lifecycleLock = new ReentrantLock();
     private MaterializedConfiguration materializedConfiguration;
     private MonitorService monitorServer;
-    private final ReentrantLock lifecycleLock = new ReentrantLock();
     private AdminTask adminTask;
-    private HeartbeatManager heartbeatManager;
 
     /**
      * Constructor
      */
     public Application() {
-        this(new ArrayList<LifecycleAware>(0));
+        this(new ArrayList<>(0));
     }
 
     /**
      * Constructor
-     *
-     * @param components
      */
     public Application(List<LifecycleAware> components) {
         this.components = components;
@@ -102,7 +97,161 @@ public class Application {
     }
 
     /**
-     * start
+     * Main entrance
+     */
+    public static void main(String[] args) {
+        try {
+            SSLUtil.initGlobalSSLParameters();
+            Options options = new Options();
+
+            Option option = new Option("n", "name", true, "the name of this agent");
+            option.setRequired(true);
+            options.addOption(option);
+
+            option = new Option("f", "conf-file", true,
+                    "specify a config file (required if -z missing)");
+            option.setRequired(false);
+            options.addOption(option);
+
+            option = new Option(null, "no-reload-conf", false,
+                    "do not reload config file if changed");
+            options.addOption(option);
+
+            // Options for Zookeeper
+            option = new Option("z", "zkConnString", true,
+                    "specify the ZooKeeper connection to use (required if -f missing)");
+            option.setRequired(false);
+            options.addOption(option);
+
+            option = new Option("p", "zkBasePath", true,
+                    "specify the base path in ZooKeeper for agent configs");
+            option.setRequired(false);
+            options.addOption(option);
+
+            option = new Option("h", "help", false, "display help text");
+            options.addOption(option);
+
+            // load configuration data from manager
+            option = new Option(null, "load-conf-from-manager", false,
+                    "load configuration data from manager");
+            option.setRequired(false);
+            options.addOption(option);
+
+            CommandLineParser parser = new GnuParser();
+            CommandLine commandLine = parser.parse(options, args);
+
+            if (commandLine.hasOption('h')) {
+                new HelpFormatter().printHelp("flume-ng agent", options, true);
+                return;
+            }
+
+            // start by manager configuration
+            if (commandLine.hasOption("load-conf-from-manager")) {
+                startByManagerConf(commandLine);
+                return;
+            }
+
+            String agentName = commandLine.getOptionValue('n');
+            boolean reload = !commandLine.hasOption("no-reload-conf");
+
+            boolean isZkConfigured = commandLine.hasOption('z') || commandLine.hasOption("zkConnString");
+
+            Application application;
+            if (isZkConfigured) {
+                // get options
+                String zkConnectionStr = commandLine.getOptionValue('z');
+                String baseZkPath = commandLine.getOptionValue('p');
+
+                if (reload) {
+                    EventBus eventBus = new EventBus(agentName + "-event-bus");
+                    List<LifecycleAware> components = Lists.newArrayList();
+                    PollingZooKeeperConfigurationProvider zProvider = new PollingZooKeeperConfigurationProvider(
+                            agentName, zkConnectionStr, baseZkPath, eventBus);
+                    components.add(zProvider);
+                    application = new Application(components);
+                    eventBus.register(application);
+                } else {
+                    StaticZooKeeperConfigurationProvider zProvider = new StaticZooKeeperConfigurationProvider(
+                            agentName, zkConnectionStr, baseZkPath);
+                    application = new Application();
+                    application.handleConfigurationEvent(zProvider.getConfiguration());
+                }
+            } else {
+                File configurationFile = new File(commandLine.getOptionValue('f'));
+                // The following is to ensure that by default the agent will fail on startup
+                // if the file does not exist.
+                if (!configurationFile.exists()) {
+                    // If command line invocation, then need to fail fast
+                    if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) {
+                        String path = configurationFile.getPath();
+                        try {
+                            path = configurationFile.getCanonicalPath();
+                        } catch (IOException ex) {
+                            LOGGER.error("failed to read canonical path for file: " + path, ex);
+                        }
+                        throw new ParseException("configuration file does not exist: " + path);
+                    }
+                }
+
+                List<LifecycleAware> components = Lists.newArrayList();
+                if (reload) {
+                    EventBus eventBus = new EventBus(agentName + "-event-bus");
+                    PollingPropertiesFileConfigurationProvider configurationProvider;
+                    configurationProvider = new PollingPropertiesFileConfigurationProvider(
+                            agentName, configurationFile, eventBus, 30);
+                    components.add(configurationProvider);
+                    application = new Application(components);
+                    eventBus.register(application);
+                } else {
+                    PropertiesFileConfigurationProvider configurationProvider;
+                    configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile);
+                    application = new Application();
+                    application.handleConfigurationEvent(configurationProvider.getConfiguration());
+                }
+            }
+            // metrics
+            MetricObserver.init(CommonPropertiesHolder.get());
+            // audit
+            AuditUtils.initAudit();
+
+            final Application appReference = application;
+            Runtime.getRuntime().addShutdownHook(new Thread("data-proxy-shutdown-hook") {
+                @Override
+                public void run() {
+                    AuditUtils.send();
+                    appReference.stop();
+                }
+            });
+
+            // start application
+            application.start();
+            Thread.sleep(5000);
+        } catch (Exception e) {
+            LOGGER.error("fatal error occurred while running data-proxy: ", e);
+        }
+    }
+
+    /**
+     * Start by Manager config
+     */
+    private static void startByManagerConf(CommandLine commandLine) {
+        String proxyName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+        ManagerPropsConfigProvider configurationProvider = new ManagerPropsConfigProvider(proxyName);
+        Application application = new Application();
+        application.handleConfigurationEvent(configurationProvider.getConfiguration());
+        application.start();
+
+        final Application appReference = application;
+        Runtime.getRuntime().addShutdownHook(new Thread("data-proxy-shutdown-hook") {
+            @Override
+            public void run() {
+                appReference.stop();
+            }
+        });
+    }
+
+    /**
+     * Start all components
      */
     public void start() {
         lifecycleLock.lock();
@@ -110,27 +259,23 @@ public class Application {
             for (LifecycleAware component : components) {
                 // update dataproxy config
                 if (component instanceof IDataProxyConfigHolder) {
-                    ((IDataProxyConfigHolder) component)
-                            .setDataProxyConfig(
-                                    RemoteConfigManager.getInstance().getCurrentClusterConfigRef());
+                    ((IDataProxyConfigHolder) component).setDataProxyConfig(
+                            RemoteConfigManager.getInstance().getCurrentClusterConfigRef());
                 }
-                supervisor.supervise(component,
-                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+                supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
             }
             // start admin task
             this.adminTask = new AdminTask(new Context(CommonPropertiesHolder.get()));
             this.adminTask.start();
-            this.heartbeatManager = new HeartbeatManager();
-            this.heartbeatManager.start();
+            HeartbeatManager heartbeatManager = new HeartbeatManager();
+            heartbeatManager.start();
         } finally {
             lifecycleLock.unlock();
         }
     }
 
     /**
-     * handleConfigurationEvent
-     *
-     * @param conf
+     * Handle the configuration event
      */
     @Subscribe
     public void handleConfigurationEvent(MaterializedConfiguration conf) {
@@ -139,8 +284,7 @@ public class Application {
             stopAllComponents();
             startAllComponents(conf);
         } catch (InterruptedException e) {
-            logger.info("Interrupted while trying to handle configuration event");
-            return;
+            LOGGER.info("interrupted while handle the configuration event");
         } finally {
             // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
             if (lifecycleLock.isHeldByCurrentThread()) {
@@ -150,7 +294,7 @@ public class Application {
     }
 
     /**
-     * stop
+     * Stop the application
      */
     public void stop() {
         lifecycleLock.lock();
@@ -170,102 +314,93 @@ public class Application {
     }
 
     /**
-     * stopAllComponents
+     * Stop all components
      */
     private void stopAllComponents() {
-        if (this.materializedConfiguration != null) {
-            logger.info("Shutting down configuration: {}", this.materializedConfiguration);
-            for (Entry<String, SourceRunner> entry : this.materializedConfiguration
-                    .getSourceRunners().entrySet()) {
+        LOGGER.info("shutting down configuration: {}", materializedConfiguration);
+        if (materializedConfiguration != null) {
+            for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) {
                 try {
-                    logger.info("Stopping Source " + entry.getKey());
+                    LOGGER.info("stopping source " + entry.getKey());
                     supervisor.unsupervise(entry.getValue());
                 } catch (Exception e) {
-                    logger.error("Error while stopping {}", entry.getValue(), e);
+                    LOGGER.error("error while stopping source " + entry.getValue(), e);
                 }
             }
 
-            for (Entry<String, SinkRunner> entry : this.materializedConfiguration.getSinkRunners()
-                    .entrySet()) {
+            for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
                 try {
-                    logger.info("Stopping Sink " + entry.getKey());
+                    LOGGER.info("stopping sink " + entry.getKey());
                     supervisor.unsupervise(entry.getValue());
                 } catch (Exception e) {
-                    logger.error("Error while stopping {}", entry.getValue(), e);
+                    LOGGER.error("error while stopping sink " + entry.getValue(), e);
                 }
             }
 
-            for (Entry<String, Channel> entry : this.materializedConfiguration.getChannels()
-                    .entrySet()) {
+            for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) {
                 try {
-                    logger.info("Stopping Channel " + entry.getKey());
+                    LOGGER.info("stopping channel " + entry.getKey());
                     supervisor.unsupervise(entry.getValue());
                 } catch (Exception e) {
-                    logger.error("Error while stopping {}", entry.getValue(), e);
+                    LOGGER.error("error while stopping channel " + entry.getValue(), e);
                 }
             }
         }
+
+        LOGGER.info("shutting down monitor server: {}", monitorServer);
         if (monitorServer != null) {
             monitorServer.stop();
         }
     }
 
     /**
-     * startAllComponents
-     *
-     * @param materializedConfiguration
+     * Start all components
      */
     private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
-        logger.info("Starting new configuration:{}", materializedConfiguration);
+        LOGGER.info("Starting new configuration:{}", materializedConfiguration);
 
         this.materializedConfiguration = materializedConfiguration;
-
         for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) {
             try {
-                logger.info("Starting Channel " + entry.getKey());
-                supervisor.supervise(entry.getValue(),
-                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+                LOGGER.info("starting channel " + entry.getKey());
+                supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(),
+                        LifecycleState.START);
             } catch (Exception e) {
-                logger.error("Error while starting {}", entry.getValue(), e);
+                LOGGER.error("error while starting channel " + entry.getValue(), e);
             }
         }
 
-        /*
-         * Wait for all channels to start.
-         */
+        // Wait for all channels to start.
         for (Channel ch : materializedConfiguration.getChannels().values()) {
             while (ch.getLifecycleState() != LifecycleState.START
                     && !supervisor.isComponentInErrorState(ch)) {
                 try {
-                    logger.info("Waiting for channel: " + ch.getName()
-                            + " to start. Sleeping for 500 ms");
+                    LOGGER.info("sleeping for 500 ms to wait for channel: {} to start", ch.getName());
                     Thread.sleep(500);
                 } catch (InterruptedException e) {
-                    logger.error("Interrupted while waiting for channel to start.", e);
+                    LOGGER.error("interrupted while waiting for channel to start: ", e);
                     Throwables.propagate(e);
                 }
             }
         }
 
-        for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
-                .entrySet()) {
+        for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
             try {
-                logger.info("Starting Sink " + entry.getKey());
-                supervisor.supervise(entry.getValue(),
-                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+                LOGGER.info("starting sink " + entry.getKey());
+                supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(),
+                        LifecycleState.START);
             } catch (Exception e) {
-                logger.error("Error while starting {}", entry.getValue(), e);
+                LOGGER.error("error while starting sink: " + entry.getValue(), e);
             }
         }
 
-        for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners()
-                .entrySet()) {
+        for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) {
             try {
-                logger.info("Starting Source " + entry.getKey());
-                supervisor.supervise(entry.getValue(),
-                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+                LOGGER.info("starting source " + entry.getKey());
+                supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(),
+                        LifecycleState.START);
             } catch (Exception e) {
-                logger.error("Error while starting {}", entry.getValue(), e);
+                LOGGER.error("error while starting source: " + entry.getValue(), e);
             }
         }
 
@@ -273,7 +408,7 @@ public class Application {
     }
 
     /**
-     * loadMonitoring
+     * Load monitoring
      */
     @SuppressWarnings("unchecked")
     private void loadMonitoring() {
@@ -285,8 +420,7 @@ public class Application {
                 Class<? extends MonitorService> klass;
                 try {
                     // Is it a known type?
-                    klass = MonitoringType.valueOf(
-                            monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
+                    klass = MonitoringType.valueOf(monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
                 } catch (Exception e) {
                     // Not a known type, use FQCN
                     klass = (Class<? extends MonitorService>) Class.forName(monitorType);
@@ -295,187 +429,15 @@ public class Application {
                 Context context = new Context();
                 for (String key : keys) {
                     if (key.startsWith(CONF_MONITOR_PREFIX)) {
-                        context.put(key.substring(CONF_MONITOR_PREFIX.length()),
-                                systemProps.getProperty(key));
+                        context.put(key.substring(CONF_MONITOR_PREFIX.length()), systemProps.getProperty(key));
                     }
                 }
                 monitorServer.configure(context);
                 monitorServer.start();
             }
         } catch (Exception e) {
-            logger.warn("Error starting monitoring. "
-                    + "Monitoring might not be available.", e);
-        }
-
-    }
-
-    /**
-     * main
-     *
-     * @param args
-     */
-    public static void main(String[] args) {
-
-        try {
-            SSLUtil.initGlobalSSLParameters();
-
-            Options options = new Options();
-
-            Option option = new Option("n", "name", true, "the name of this agent");
-            option.setRequired(true);
-            options.addOption(option);
-
-            option = new Option("f", "conf-file", true,
-                    "specify a config file (required if -z missing)");
-            option.setRequired(false);
-            options.addOption(option);
-
-            option = new Option(null, "no-reload-conf", false,
-                    "do not reload config file if changed");
-            options.addOption(option);
-
-            // Options for Zookeeper
-            option = new Option("z", "zkConnString", true,
-                    "specify the ZooKeeper connection to use (required if -f missing)");
-            option.setRequired(false);
-            options.addOption(option);
-
-            option = new Option("p", "zkBasePath", true,
-                    "specify the base path in ZooKeeper for agent configs");
-            option.setRequired(false);
-            options.addOption(option);
-
-            option = new Option("h", "help", false, "display help text");
-            options.addOption(option);
-
-            // load configuration data from manager
-            option = new Option(null, "load-conf-from-manager", false,
-                    "load configuration data from manager");
-            option.setRequired(false);
-            options.addOption(option);
-
-            CommandLineParser parser = new GnuParser();
-            CommandLine commandLine = parser.parse(options, args);
-
-            if (commandLine.hasOption('h')) {
-                new HelpFormatter().printHelp("flume-ng agent", options, true);
-                return;
-            }
-
-            // start by manager configuation
-            if (commandLine.hasOption("load-conf-from-manager")) {
-                startByManagerConf(commandLine);
-                return;
-            }
-
-            String agentName = commandLine.getOptionValue('n');
-            boolean reload = !commandLine.hasOption("no-reload-conf");
-
-            boolean isZkConfigured = false;
-            if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
-                isZkConfigured = true;
-            }
-
-            Application application;
-            if (isZkConfigured) {
-                // get options
-                String zkConnectionStr = commandLine.getOptionValue('z');
-                String baseZkPath = commandLine.getOptionValue('p');
-
-                if (reload) {
-                    EventBus eventBus = new EventBus(agentName + "-event-bus");
-                    List<LifecycleAware> components = Lists.newArrayList();
-                    PollingZooKeeperConfigurationProvider zProvider = new PollingZooKeeperConfigurationProvider(
-                            agentName, zkConnectionStr, baseZkPath, eventBus);
-                    components.add(zProvider);
-                    application = new Application(components);
-                    eventBus.register(application);
-                } else {
-                    StaticZooKeeperConfigurationProvider zProvider = new StaticZooKeeperConfigurationProvider(
-                            agentName, zkConnectionStr, baseZkPath);
-                    application = new Application();
-                    application.handleConfigurationEvent(zProvider.getConfiguration());
-                }
-            } else {
-                File configurationFile = new File(commandLine.getOptionValue('f'));
-
-                // The following is to ensure that by default the agent will fail on startup
-                // if the file does not exist.
-                if (!configurationFile.exists()) {
-                    // If command line invocation, then need to fail fast
-                    if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) {
-                        String path = configurationFile.getPath();
-                        try {
-                            path = configurationFile.getCanonicalPath();
-                        } catch (IOException ex) {
-                            logger.error("Failed to read canonical path for file: " + path,
-                                    ex);
-                        }
-                        throw new ParseException(
-                                "The specified configuration file does not exist: " + path);
-                    }
-                }
-                List<LifecycleAware> components = Lists.newArrayList();
-
-                if (reload) {
-                    EventBus eventBus = new EventBus(agentName + "-event-bus");
-                    PollingPropertiesFileConfigurationProvider configurationProvider;
-                    configurationProvider = new PollingPropertiesFileConfigurationProvider(
-                            agentName, configurationFile, eventBus, 30);
-                    components.add(configurationProvider);
-                    application = new Application(components);
-                    eventBus.register(application);
-                } else {
-                    PropertiesFileConfigurationProvider configurationProvider;
-                    configurationProvider = new PropertiesFileConfigurationProvider(
-                            agentName, configurationFile);
-                    application = new Application();
-                    application.handleConfigurationEvent(configurationProvider.getConfiguration());
-                }
-            }
-            // metrics
-            MetricObserver.init(CommonPropertiesHolder.get());
-            // audit
-            AuditUtils.initAudit();
-
-            final Application appReference = application;
-            Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
-
-                @Override
-                public void run() {
-                    AuditUtils.sendReport();
-                    appReference.stop();
-                }
-            });
-
-            // start application
-            application.start();
-            Thread.sleep(5000);
-        } catch (Exception e) {
-            logger.error("A fatal error occurred while running. Exception follows.", e);
+            LOGGER.warn("starting monitoring error, the monitoring might not be available: ", e);
         }
     }
 
-    /**
-     * startByManagerConf
-     *
-     * @param commandLine
-     */
-    private static void startByManagerConf(CommandLine commandLine) {
-        String proxyName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
-        ManagerPropertiesConfigurationProvider configurationProvider = new ManagerPropertiesConfigurationProvider(
-                proxyName);
-        Application application = new Application();
-        application.handleConfigurationEvent(configurationProvider.getConfiguration());
-        application.start();
-
-        final Application appReference = application;
-        Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
-
-            @Override
-            public void run() {
-                appReference.stop();
-            }
-        });
-    }
-}
\ No newline at end of file
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropertiesConfigurationProvider.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropsConfigProvider.java
similarity index 62%
rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropertiesConfigurationProvider.java
rename to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropsConfigProvider.java
index d1451ef28..04adf218b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropertiesConfigurationProvider.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropsConfigProvider.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,48 +17,38 @@
 
 package org.apache.inlong.dataproxy.node;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.flume.conf.FlumeConfiguration;
 import org.apache.flume.node.AbstractConfigurationProvider;
 import org.apache.inlong.dataproxy.config.RemoteConfigManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
- * 
- * ManagerPropertiesConfigurationProvider
+ * Manager properties configuration provider
  */
-public class ManagerPropertiesConfigurationProvider extends
-        AbstractConfigurationProvider {
+public class ManagerPropsConfigProvider extends AbstractConfigurationProvider {
 
-    private static final Logger LOGGER = LoggerFactory
-            .getLogger(ManagerPropertiesConfigurationProvider.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ManagerPropsConfigProvider.class);
 
-    /**
-     * ManagerPropertiJesConfigurationProvider
-     * 
-     * @param agentName
-     */
-    public ManagerPropertiesConfigurationProvider(String agentName) {
+    public ManagerPropsConfigProvider(String agentName) {
         super(agentName);
     }
 
     /**
-     * getFlumeConfiguration
-     * 
-     * @return
+     * Get Flume configuration
      */
     @Override
     public FlumeConfiguration getFlumeConfiguration() {
         try {
             Map<String, String> flumeProperties = RemoteConfigManager.getInstance().getFlumeProperties();
-            LOGGER.info("flumeProperties:{}", flumeProperties);
+            LOGGER.info("all flume props: {}", flumeProperties);
             return new FlumeConfiguration(flumeProperties);
         } catch (Exception e) {
-            LOGGER.error("exception catch:" + e.getMessage(), e);
+            LOGGER.error("get flume props error: ", e);
         }
-        return new FlumeConfiguration(new HashMap<String, String>());
+        return new FlumeConfiguration(new HashMap<>());
     }
 }
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
index 50d6f69ce..e0094a3ce 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
@@ -25,38 +25,35 @@ import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
 
 /**
- * 
- * SortStandaloneApplication
+ * Sort Standalone Application
  */
 public class SortStandaloneApplication {
 
-    public static final Logger LOG = InlongLoggerFactory.getLogger(Application.class);
+    public static final Logger LOGGER = InlongLoggerFactory.getLogger(Application.class);
 
     /**
-     * main
-     * 
-     * @param args
+     * Main entrance
      */
     public static void main(String[] args) {
-        LOG.info("start to sort-standalone");
+        LOGGER.info("start to sort-standalone");
         try {
             SortCluster cluster = new SortCluster();
-            Runtime.getRuntime().addShutdownHook(new Thread("sortstandalone-shutdown-hook") {
+            Runtime.getRuntime().addShutdownHook(new Thread("sort-standalone-shutdown-hook") {
 
                 @Override
                 public void run() {
-                    AuditUtils.sendReport();
+                    AuditUtils.send();
                     cluster.close();
                 }
             });
-            //
+            // start the cluster
             cluster.start();
             // metrics
             MetricObserver.init(CommonPropertiesHolder.get());
             AuditUtils.initAudit();
             Thread.sleep(5000);
         } catch (Exception e) {
-            LOG.error("A fatal error occurred while running. Exception follows.", e);
+            LOGGER.error("fatal error occurred while running sort-standalone: ", e);
         }
     }
 }
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
index bc0d219b8..6a7a8900c 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,22 +17,22 @@
 
 package org.apache.inlong.sort.standalone.metrics.audit;
 
-import java.util.HashSet;
-import java.util.Map;
-
 import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Event;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.audit.util.AuditConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+
 /**
- * 
- * AuditUtils
+ * Audit utils
  */
 public class AuditUtils {
 
@@ -49,7 +49,7 @@ public class AuditUtils {
     private static boolean IS_AUDIT = true;
 
     /**
-     * initAudit
+     * Init audit
      */
     public static void initAudit() {
         // IS_AUDIT
@@ -60,11 +60,9 @@ public class AuditUtils {
             HashSet<String> proxys = new HashSet<>();
             if (!StringUtils.isBlank(strIpPorts)) {
                 String[] ipPorts = strIpPorts.split("\\s+");
-                for (String ipPort : ipPorts) {
-                    proxys.add(ipPort);
-                }
+                Collections.addAll(proxys, ipPorts);
             }
-            AuditImp.getInstance().setAuditProxy(proxys);
+            AuditOperator.getInstance().setAuditProxy(proxys);
             // AuditConfig
             String filePath = CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH,
                     AUDIT_DEFAULT_FILE_PATH);
@@ -72,30 +70,24 @@ public class AuditUtils {
                     CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS),
                     AUDIT_DEFAULT_MAX_CACHE_ROWS);
             AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
-            AuditImp.getInstance().setAuditConfig(auditConfig);
+            AuditOperator.getInstance().setAuditConfig(auditConfig);
         }
     }
 
     /**
-     * add
-     * 
-     * @param auditID
-     * @param event
+     * Add audit data
      */
     public static void add(int auditID, ProfileEvent event) {
         if (IS_AUDIT && event != null) {
             String inlongGroupId = event.getInlongGroupId();
             String inlongStreamId = event.getInlongStreamId();
             long logTime = event.getRawLogTime();
-            AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+            AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
         }
     }
 
     /**
-     * add
-     * 
-     * @param auditID
-     * @param event
+     * Add audit data
      */
     public static void add(int auditID, Event event) {
         if (IS_AUDIT && event != null) {
@@ -103,14 +95,14 @@ public class AuditUtils {
             String inlongGroupId = SortMetricItem.getInlongGroupId(headers);
             String inlongStreamId = SortMetricItem.getInlongStreamId(headers);
             long logTime = SortMetricItem.getLogTime(headers);
-            AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+            AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
         }
     }
 
     /**
-     * sendReport
+     * Send audit data
      */
-    public static void sendReport() {
-        AuditImp.getInstance().sendReport();
+    public static void send() {
+        AuditOperator.getInstance().send();
     }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 45e33c3c2..a117a3f27 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -22,7 +22,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 
@@ -43,9 +43,10 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
  */
 public class SinkMetricData implements MetricData {
 
-    private MetricGroup metricGroup;
-    private Map<String, String> labels;
-    private AuditImp auditImp;
+    private final MetricGroup metricGroup;
+    private final Map<String, String> labels;
+    private final RegisteredMetric registeredMetric;
+    private AuditOperator auditOperator;
     private Counter numRecordsOut;
     private Counter numBytesOut;
     private Counter numRecordsOutForMeter;
@@ -54,7 +55,6 @@ public class SinkMetricData implements MetricData {
     private Counter dirtyBytes;
     private Meter numRecordsOutPerSecond;
     private Meter numBytesOutPerSecond;
-    private RegisteredMetric registeredMetric;
 
     public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
@@ -94,8 +94,8 @@ public class SinkMetricData implements MetricData {
         }
 
         if (option.getIpPorts().isPresent()) {
-            AuditImp.getInstance().setAuditProxy(option.getIpPortList());
-            this.auditImp = AuditImp.getInstance();
+            AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
+            this.auditOperator = AuditOperator.getInstance();
         }
     }
 
@@ -271,8 +271,8 @@ public class SinkMetricData implements MetricData {
             numBytesOutForMeter.inc(rowSize);
         }
 
-        if (auditImp != null) {
-            auditImp.add(
+        if (auditOperator != null) {
+            auditOperator.add(
                     Constants.AUDIT_SORT_OUTPUT,
                     getGroupId(),
                     getStreamId(),
@@ -296,10 +296,10 @@ public class SinkMetricData implements MetricData {
     public String toString() {
         switch (registeredMetric) {
             case DIRTY:
-                 return "SinkMetricData{"
+                return "SinkMetricData{"
                         + "metricGroup=" + metricGroup
                         + ", labels=" + labels
-                        + ", auditImp=" + auditImp
+                        + ", auditOperator=" + auditOperator
                         + ", dirtyRecords=" + dirtyRecords.getCount()
                         + ", dirtyBytes=" + dirtyBytes.getCount()
                         + '}';
@@ -307,7 +307,7 @@ public class SinkMetricData implements MetricData {
                 return "SinkMetricData{"
                         + "metricGroup=" + metricGroup
                         + ", labels=" + labels
-                        + ", auditImp=" + auditImp
+                        + ", auditOperator=" + auditOperator
                         + ", numRecordsOut=" + numRecordsOut.getCount()
                         + ", numBytesOut=" + numBytesOut.getCount()
                         + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
@@ -319,7 +319,7 @@ public class SinkMetricData implements MetricData {
                 return "SinkMetricData{"
                         + "metricGroup=" + metricGroup
                         + ", labels=" + labels
-                        + ", auditImp=" + auditImp
+                        + ", auditOperator=" + auditOperator
                         + ", numRecordsOut=" + numRecordsOut.getCount()
                         + ", numBytesOut=" + numBytesOut.getCount()
                         + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 3ac6a96f8..b93c5bb53 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -22,7 +22,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.sort.base.Constants;
 
 import java.nio.charset.StandardCharsets;
@@ -40,15 +40,15 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
  */
 public class SourceMetricData implements MetricData {
 
-    private MetricGroup metricGroup;
-    private Map<String, String> labels;
+    private final MetricGroup metricGroup;
+    private final Map<String, String> labels;
     private Counter numRecordsIn;
     private Counter numBytesIn;
     private Counter numRecordsInForMeter;
     private Counter numBytesInForMeter;
     private Meter numRecordsInPerSecond;
     private Meter numBytesInPerSecond;
-    private AuditImp auditImp;
+    private AuditOperator auditOperator;
 
     public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
@@ -70,8 +70,8 @@ public class SourceMetricData implements MetricData {
         }
 
         if (option.getIpPorts().isPresent()) {
-            AuditImp.getInstance().setAuditProxy(option.getIpPortList());
-            this.auditImp = AuditImp.getInstance();
+            AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
+            this.auditOperator = AuditOperator.getInstance();
         }
     }
 
@@ -211,8 +211,8 @@ public class SourceMetricData implements MetricData {
             this.numBytesInForMeter.inc(rowDataSize);
         }
 
-        if (auditImp != null) {
-            auditImp.add(
+        if (auditOperator != null) {
+            auditOperator.add(
                     Constants.AUDIT_SORT_INPUT,
                     getGroupId(),
                     getStreamId(),
@@ -233,7 +233,7 @@ public class SourceMetricData implements MetricData {
                 + ", numBytesInForMeter=" + numBytesInForMeter.getCount()
                 + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
                 + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
-                + ", auditImp=" + auditImp
+                + ", auditOperator=" + auditOperator
                 + '}';
     }
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
index b4fbbb9da..dea86d46a 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,8 +17,7 @@
 
 package org.apache.inlong.tubemq.server.broker.stats.audit;
 
-import java.util.Map;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.audit.util.AuditConfig;
 import org.apache.inlong.tubemq.corebase.TokenConstants;
 import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
@@ -26,18 +25,21 @@ import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
 import org.apache.inlong.tubemq.server.common.fileconfig.ADConfig;
 
+import java.util.Map;
+
 /**
  * AuditUtils
  *
  * A wrapper class for Audit report operations
  */
 public class AuditUtils {
+
     private static ADConfig auditConfig = new ADConfig();
 
     /**
      * init audit instance
      *
-     * @param adConfig  the initial configure
+     * @param adConfig the initial configure
      */
     public static void initAudit(ADConfig adConfig) {
         // check whether enable audit
@@ -48,28 +50,27 @@ public class AuditUtils {
         auditConfig = adConfig;
 
         // initial audit instance
-        AuditImp.getInstance().setAuditProxy(adConfig.getAuditProxyAddrSet());
+        AuditOperator.getInstance().setAuditProxy(adConfig.getAuditProxyAddrSet());
         AuditConfig auditConfig =
                 new AuditConfig(adConfig.getAuditCacheFilePath(),
                         adConfig.getAuditCacheMaxRows());
-        AuditImp.getInstance().setAuditConfig(auditConfig);
+        AuditOperator.getInstance().setAuditConfig(auditConfig);
     }
 
     /**
      * add produce record
      *
-     * @param groupId   the group id
-     * @param streamId  the stream id
-     * @param logTime   the record time
-     * @param count     the record count
-     * @param size      the record size
+     * @param groupId the group id
+     * @param streamId the stream id
+     * @param logTime the record time
+     * @param count the record count
+     * @param size the record size
      */
-    public static void addProduceRecord(String groupId, String streamId,
-                                        String logTime, long count, long size) {
+    public static void addProduceRecord(String groupId, String streamId, String logTime, long count, long size) {
         if (!auditConfig.isAuditEnable()) {
             return;
         }
-        AuditImp.getInstance().add(auditConfig.getAuditIdProduce(),
+        AuditOperator.getInstance().add(auditConfig.getAuditIdProduce(),
                 groupId, streamId, DateTimeConvertUtils.yyyyMMddHHmm2ms(logTime), count, size);
     }
 
@@ -79,8 +80,7 @@ public class AuditUtils {
      * @param trafficInfos the consumed traffic information
      */
     public static void addConsumeRecord(Map<String, TrafficInfo> trafficInfos) {
-        if (!auditConfig.isAuditEnable()
-                || trafficInfos == null || trafficInfos.isEmpty()) {
+        if (!auditConfig.isAuditEnable() || trafficInfos == null || trafficInfos.isEmpty()) {
             return;
         }
         for (Map.Entry<String, TrafficInfo> entry : trafficInfos.entrySet()) {
@@ -99,19 +99,19 @@ public class AuditUtils {
             //       #127.0.0.1#32677#test_consume#2#202207041219
             //       topicName, brokerIP, clientId,
             //       clientIP, client processId, consume group, partitionId, msgTime
-            AuditImp.getInstance().add(auditConfig.getAuditIdConsume(),
+            AuditOperator.getInstance().add(auditConfig.getAuditIdConsume(),
                     statKeyItems[0], statKeyItems[5], DateTimeConvertUtils.yyyyMMddHHmm2ms(statKeyItems[7]),
                     entry.getValue().getMsgCount(), entry.getValue().getMsgSize());
         }
     }
 
     /**
-     * close audit report
+     * Close audit, if it was enabled, send its data first.
      */
     public static void closeAudit() {
         if (!auditConfig.isAuditEnable()) {
             return;
         }
-        AuditImp.getInstance().sendReport();
+        AuditOperator.getInstance().send();
     }
 }