You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/08/07 15:10:07 UTC

[inlong] branch master updated: [INLONG-5373][TubeMQ] Supports outputting traffic information through the audit SDK (#5385)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 025e719ea [INLONG-5373][TubeMQ] Supports outputting traffic information through the audit SDK (#5385)
025e719ea is described below

commit 025e719ea92b5e645f3232934dd18fd3582178df
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Aug 7 23:10:01 2022 +0800

    [INLONG-5373][TubeMQ] Supports outputting traffic information through the audit SDK (#5385)
---
 inlong-tubemq/conf/broker.ini                      |  14 ++-
 inlong-tubemq/tubemq-server/pom.xml                |  11 ++
 .../inlong/tubemq/server/broker/BrokerConfig.java  |  12 +++
 .../tubemq/server/broker/BrokerServiceServer.java  |   8 ++
 .../server/broker/stats/audit/AuditUtils.java      | 117 +++++++++++++++++++++
 .../tubemq/server/common/fileconfig/ADConfig.java  | 109 +++++++++++++++++++
 .../common/fileconfig/AbstractFileConfig.java      |  40 +++++++
 .../inlong/tubemq/server/common/ADConfigTest.java  |  52 +++++++++
 8 files changed, 362 insertions(+), 1 deletion(-)

diff --git a/inlong-tubemq/conf/broker.ini b/inlong-tubemq/conf/broker.ini
index 08aea7d36..1465e6fd8 100644
--- a/inlong-tubemq/conf/broker.ini
+++ b/inlong-tubemq/conf/broker.ini
@@ -58,4 +58,16 @@ zkCommitPeriodMs=5000
 ; maximum retry times when commits data on ZK fails
 zkCommitFailRetries=10
 
-
+[audit]
+; whether to enable data report by audit sdk
+auditEnable=false
+; audit proxy server addresses
+auditProxyAddr=127.0.0.1:10081,127.0.0.2:10081
+; file path for audit cache data
+auditCacheFilePath=/data/inlong/audit
+; max cache records for audit cache
+auditCacheMaxRows=2000000
+; audit id for production
+auditIdProduce=9
+; audit id for consumption
+auditIdConsume=10
diff --git a/inlong-tubemq/tubemq-server/pom.xml b/inlong-tubemq/tubemq-server/pom.xml
index 3c2d0f0e5..eaf67d302 100644
--- a/inlong-tubemq/tubemq-server/pom.xml
+++ b/inlong-tubemq/tubemq-server/pom.xml
@@ -127,6 +127,17 @@
             <artifactId>tubemq-example</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-sdk</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-simple</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerConfig.java
index 62258873b..d19e9d67e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerConfig.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerConfig.java
@@ -27,6 +27,7 @@ import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
+import org.apache.inlong.tubemq.server.common.fileconfig.ADConfig;
 import org.apache.inlong.tubemq.server.common.fileconfig.AbstractFileConfig;
 import org.apache.inlong.tubemq.server.common.fileconfig.ZKConfig;
 import org.ini4j.Ini;
@@ -110,6 +111,8 @@ public class BrokerConfig extends AbstractFileConfig {
     private ZKConfig zkConfig = new ZKConfig();
     // tls config
     private TLSConfig tlsConfig = new TLSConfig();
+    // audit configure
+    private ADConfig auditConfig = new ADConfig();
     private boolean visitMasterAuth = false;
     private String visitName = "";
     private String visitPassword = "";
@@ -149,6 +152,14 @@ public class BrokerConfig extends AbstractFileConfig {
         return this.tlsConfig;
     }
 
+    public ADConfig getAuditConfig() {
+        return auditConfig;
+    }
+
+    public boolean isAuditEnable() {
+        return this.auditConfig.isAuditEnable();
+    }
+
     public int getBrokerId() {
         if (this.brokerId <= 0) {
             try {
@@ -194,6 +205,7 @@ public class BrokerConfig extends AbstractFileConfig {
         this.tlsConfig = this.loadTlsSectConf(iniConf,
                 TBaseConstants.META_DEFAULT_BROKER_TLS_PORT);
         this.zkConfig = loadZKeeperSectConf(iniConf);
+        this.auditConfig = loadAuditSectConf(iniConf);
         if (this.port == this.webPort
                 || (tlsConfig.isTlsEnable() && (this.tlsConfig.getTlsPort() == this.webPort))) {
             throw new IllegalArgumentException(new StringBuilder(512)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index a0361398d..ae1d28206 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -68,6 +68,7 @@ import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo;
 import org.apache.inlong.tubemq.server.broker.offset.OffsetService;
 import org.apache.inlong.tubemq.server.broker.stats.BrokerSrvStatsHolder;
 import org.apache.inlong.tubemq.server.broker.stats.TrafficStatsService;
+import org.apache.inlong.tubemq.server.broker.stats.audit.AuditUtils;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
 import org.apache.inlong.tubemq.server.common.TStatusConstants;
 import org.apache.inlong.tubemq.server.common.aaaserver.CertificateBrokerHandler;
@@ -127,6 +128,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
         this.serverAuthHandler = tubeBroker.getServerAuthHandler();
         ServiceStatusHolder.setStatsParameters(tubeConfig.getAllowedReadIOExcptCnt(),
                 tubeConfig.getAllowedWriteIOExcptCnt(), tubeConfig.getIoExcptStatsDurationMs());
+        AuditUtils.initAudit(tubeConfig.getAuditConfig());
         this.putCounterGroup = new TrafficStatsService("PutCounterGroup", "Producer", 60 * 1000);
         this.getCounterGroup = new TrafficStatsService("GetCounterGroup", "Consumer", 60 * 1000);
         this.heartbeatManager = new HeartbeatManager();
@@ -222,6 +224,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
         heartbeatManager.stop();
         putCounterGroup.close(-1);
         getCounterGroup.close(-1);
+        AuditUtils.closeAudit();
         logger.info("BrokerService server stopped");
     }
 
@@ -379,6 +382,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                         msgResult.lastRdDataOffset,
                         msgResult.totalMsgSize);
                 getCounterGroup.add(msgResult.tmpCounters);
+                AuditUtils.addConsumeRecord(msgResult.tmpCounters);
                 builder.setEscFlowCtrl(false);
                 builder.setRequireSlow(msgResult.isSlowFreq);
                 builder.setSuccess(true);
@@ -674,6 +678,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                         .append("#").append(request.getPartitionId())
                         .append("#").append(request.getMsgTime()).toString();
                 putCounterGroup.add(baseKey, 1L, dataLength);
+                AuditUtils.addProduceRecord(topicName,
+                        request.getMsgType(), request.getMsgTime(), 1, dataLength);
                 builder.setSuccess(true);
                 builder.setRequireAuth(certResult.reAuth);
                 builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -777,6 +783,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                             .append("#").append(partitionId)
                             .append("#").append(sendTime).toString();
                     putCounterGroup.add(baseKey, 1L, msgLength);
+                    AuditUtils.addProduceRecord(TServerConstants.OFFSET_HISTORY_NAME,
+                            entry.getKey(), sendTime, 1, msgLength);
                     strBuff.delete(0, strBuff.length());
                 } else {
                     logger.warn("Put history offset overflow !");
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
new file mode 100644
index 000000000..a5484d3e1
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.stats.audit;
+
+import java.util.Map;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.util.AuditConfig;
+import org.apache.inlong.tubemq.corebase.TokenConstants;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
+import org.apache.inlong.tubemq.server.common.fileconfig.ADConfig;
+
+/**
+ * AuditUtils
+ *
+ * A wrapper class for Audit report operations
+ */
+public class AuditUtils {
+    private static ADConfig auditConfig;
+
+    /**
+     * init audit instance
+     *
+     * @param adConfig  the initial configure
+     */
+    public static void initAudit(ADConfig adConfig) {
+        // check whether enable audit
+        if (adConfig == null || !adConfig.isAuditEnable()) {
+            return;
+        }
+        // set audit configure
+        auditConfig = adConfig;
+
+        // initial audit instance
+        AuditImp.getInstance().setAuditProxy(adConfig.getAuditProxyAddrSet());
+        AuditConfig auditConfig =
+                new AuditConfig(adConfig.getAuditCacheFilePath(),
+                        adConfig.getAuditCacheMaxRows());
+        AuditImp.getInstance().setAuditConfig(auditConfig);
+    }
+
+    /**
+     * add produce record
+     *
+     * @param groupId   the group id
+     * @param streamId  the stream id
+     * @param logTime   the record time
+     * @param count     the record count
+     * @param size      the record size
+     */
+    public static void addProduceRecord(String groupId, String streamId,
+                                        String logTime, long count, long size) {
+        if (!auditConfig.isAuditEnable()) {
+            return;
+        }
+        AuditImp.getInstance().add(auditConfig.getAuditIdProduce(),
+                groupId, streamId, DateTimeConvertUtils.yyyyMMddHHmm2ms(logTime), count, size);
+    }
+
+    /**
+     * add consume record
+     *
+     * @param trafficInfos the consumed traffic information
+     */
+    public static void addConsumeRecord(Map<String, TrafficInfo> trafficInfos) {
+        if (!auditConfig.isAuditEnable()
+                || trafficInfos == null || trafficInfos.isEmpty()) {
+            return;
+        }
+        for (Map.Entry<String, TrafficInfo> entry : trafficInfos.entrySet()) {
+            if (entry == null || entry.getKey() == null || entry.getValue() == null) {
+                continue;
+            }
+            String statKey = entry.getKey();
+            String[] statKeyItems = statKey.split(TokenConstants.SEGMENT_SEP, -1);
+            if (statKeyItems.length < 8) {
+                continue;
+            }
+            if (TStringUtils.isEmpty(statKeyItems[0])) {
+                continue;
+            }
+            // like: test_1#127.0.0.1#test_consume_127.0.0.1-32677-1656672066382-1-Pull-3.9.2
+            //       #127.0.0.1#32677#test_consume#2#202207041219
+            //       topicName, brokerIP, clientId,
+            //       clientIP, client processId, consume group, partitionId, msgTime
+            AuditImp.getInstance().add(auditConfig.getAuditIdConsume(),
+                    statKeyItems[0], statKeyItems[5], DateTimeConvertUtils.yyyyMMddHHmm2ms(statKeyItems[7]),
+                    entry.getValue().getMsgCount(), entry.getValue().getMsgSize());
+        }
+    }
+
+    /**
+     * close audit report
+     */
+    public static void closeAudit() {
+        if (!auditConfig.isAuditEnable()) {
+            return;
+        }
+        AuditImp.getInstance().sendReport();
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/ADConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/ADConfig.java
new file mode 100644
index 000000000..674869f23
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/ADConfig.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.common.fileconfig;
+
+import java.util.HashSet;
+import java.util.List;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
+
+public class ADConfig {
+    // whether to enable data report by audit sdk
+    private boolean auditEnable = false;
+    // audit proxy server addresses
+    private HashSet<String> auditProxyAddrSet = new HashSet<>();
+    // file path for audit cache data
+    private String auditCacheFilePath = "/data/inlong/audit";
+    // max cache records for audit cache
+    private int auditCacheMaxRows = 2000000;
+    // audit id for production
+    private int auditIdProduce = 9;
+    // audit id for consumption
+    private int auditIdConsume = 10;
+
+    public ADConfig() {
+
+    }
+
+    public boolean isAuditEnable() {
+        return auditEnable;
+    }
+
+    public void setAuditEnable(boolean auditEnable) {
+        this.auditEnable = auditEnable;
+    }
+
+    public HashSet<String> getAuditProxyAddrSet() {
+        return auditProxyAddrSet;
+    }
+
+    public void setAuditProxyAddrSet(List<String> auditProxyAddrs) {
+        if (auditProxyAddrs == null || auditProxyAddrs.isEmpty()) {
+            return;
+        }
+        this.auditProxyAddrSet.clear();
+        for (String addrItem : auditProxyAddrs) {
+            if (TStringUtils.isEmpty(addrItem)) {
+                continue;
+            }
+            this.auditProxyAddrSet.add(addrItem);
+        }
+    }
+
+    public String getAuditCacheFilePath() {
+        return auditCacheFilePath;
+    }
+
+    public void setAuditCacheFilePath(String auditCacheFilePath) {
+        this.auditCacheFilePath = auditCacheFilePath;
+    }
+
+    public int getAuditCacheMaxRows() {
+        return auditCacheMaxRows;
+    }
+
+    public void setAuditCacheMaxRows(int auditCacheMaxRows) {
+        this.auditCacheMaxRows = auditCacheMaxRows;
+    }
+
+    public int getAuditIdProduce() {
+        return auditIdProduce;
+    }
+
+    public void setAuditIdProduce(int auditIdProduce) {
+        this.auditIdProduce = auditIdProduce;
+    }
+
+    public int getAuditIdConsume() {
+        return auditIdConsume;
+    }
+
+    public void setAuditIdConsume(int auditIdConsume) {
+        this.auditIdConsume = auditIdConsume;
+    }
+
+    public String toString() {
+        return new StringBuilder(512)
+                .append("\"ADConfig\":{\"auditEnable\":").append(auditEnable)
+                .append(",\"auditProxyAddr\":\"").append(auditProxyAddrSet)
+                .append("\",\"auditCacheFilePath\":\"").append(auditCacheFilePath)
+                .append("\",\"auditCacheMaxRows\":").append(auditCacheMaxRows)
+                .append(",\"auditIdProduce\":").append(auditIdProduce)
+                .append(",\"auditIdConsume\":").append(auditIdConsume)
+                .append("}").toString();
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/AbstractFileConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/AbstractFileConfig.java
index ce559c880..ac3bb31ed 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/AbstractFileConfig.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fileconfig/AbstractFileConfig.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
+import org.apache.inlong.tubemq.corebase.cluster.MasterInfo;
 import org.apache.inlong.tubemq.corebase.config.TLSConfig;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.server.broker.exception.StartupException;
@@ -38,6 +39,7 @@ public abstract class AbstractFileConfig {
     protected static final String SECT_TOKEN_REPLICATION = "replication";
     protected static final String SECT_TOKEN_META_BDB = "meta_bdb";
     protected static final String SECT_TOKEN_META_ZK = "meta_zookeeper";
+    protected static final String SECT_TOKEN_META_AUDIT = "audit";
 
     private static final Logger logger =
             LoggerFactory.getLogger(AbstractFileConfig.class);
@@ -276,6 +278,44 @@ public abstract class AbstractFileConfig {
         return zkConfig;
     }
 
+    protected ADConfig loadAuditSectConf(final Ini iniConf) {
+        final Profile.Section auditSect = iniConf.get(SECT_TOKEN_META_AUDIT);
+        ADConfig adConfig = new ADConfig();
+        if (auditSect == null) {
+            return adConfig;
+        }
+        Set<String> configKeySet = auditSect.keySet();
+        if (configKeySet.isEmpty()) {
+            return adConfig;
+        }
+        if (TStringUtils.isNotBlank(auditSect.get("auditEnable"))) {
+            adConfig.setAuditEnable(getBoolean(auditSect, "auditEnable"));
+        }
+        // parse auditProxyAddr configure
+        String auditProxyAddrStr = auditSect.get("auditProxyAddr");
+        if (TStringUtils.isBlank(auditProxyAddrStr)) {
+            throw new IllegalArgumentException(new StringBuilder(256)
+                    .append("auditProxyAddr is null or Blank in ")
+                    .append(SECT_TOKEN_META_AUDIT).append(" section!").toString());
+        }
+        MasterInfo auditAddrInfo = new MasterInfo(auditProxyAddrStr);
+        adConfig.setAuditProxyAddrSet(auditAddrInfo.getNodeHostPortList());
+        // get cache file path
+        if (TStringUtils.isNotBlank(auditSect.get("auditCacheFilePath"))) {
+            adConfig.setAuditCacheFilePath(auditSect.get("auditCacheFilePath").trim());
+        }
+        if (TStringUtils.isNotBlank(auditSect.get("auditCacheMaxRows"))) {
+            adConfig.setAuditCacheMaxRows(getInt(auditSect, "auditCacheMaxRows"));
+        }
+        if (TStringUtils.isNotBlank(auditSect.get("auditIdProduce"))) {
+            adConfig.setAuditIdProduce(getInt(auditSect, "auditIdProduce"));
+        }
+        if (TStringUtils.isNotBlank(auditSect.get("auditIdConsume"))) {
+            adConfig.setAuditIdConsume(getInt(auditSect, "auditIdConsume"));
+        }
+        return adConfig;
+    }
+
     @Override
     public String toString() {
         return new StringBuilder(512)
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/ADConfigTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/ADConfigTest.java
new file mode 100644
index 000000000..02cfb6eb7
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/ADConfigTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.inlong.tubemq.server.common.fileconfig.ADConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ADConfigTest {
+
+    @Test
+    public void checkProducerTopicTest() {
+        ADConfig auditConfig = new ADConfig();
+        Assert.assertFalse(auditConfig.isAuditEnable());
+        Assert.assertEquals(auditConfig.getAuditIdConsume(), 10);
+        auditConfig.setAuditIdConsume(7);
+        Assert.assertEquals(auditConfig.getAuditIdConsume(), 7);
+        auditConfig.setAuditIdProduce(5);
+        Assert.assertEquals(auditConfig.getAuditIdProduce(), 5);
+        auditConfig.setAuditCacheMaxRows(1000);
+        Assert.assertEquals(auditConfig.getAuditCacheMaxRows(), 1000);
+        auditConfig.setAuditEnable(true);
+        Assert.assertTrue(auditConfig.isAuditEnable());
+        List<String> addrs = new ArrayList<>();
+        addrs.add("test");
+        auditConfig.setAuditProxyAddrSet(addrs);
+        Assert.assertEquals(auditConfig.getAuditProxyAddrSet().size(), addrs.size());
+        for (String addr : addrs) {
+            Assert.assertTrue(auditConfig.getAuditProxyAddrSet().contains(addr));
+        }
+        auditConfig.setAuditCacheFilePath("aaa");
+        Assert.assertEquals(auditConfig.getAuditCacheFilePath(), "aaa");
+    }
+
+}